Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java b/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
index 0820958..ef42c32 100644
--- a/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/IntentRemoveCommand.java
@@ -84,14 +84,13 @@
         Key key = Key.of(new BigInteger(id, 16).longValue(), appId);
         Intent intent = intentService.getIntent(key);
 
-
         if (intent != null) {
             // set up latch and listener to track uninstall progress
             CountDownLatch latch = new CountDownLatch(1);
             IntentListener listener = (IntentEvent event) -> {
                 if (Objects.equals(event.subject().key(), key) &&
-                        (event.type() == IntentEvent.Type.WITHDRAWN
-                                || event.type() == IntentEvent.Type.WITHDRAWN)) {
+                        (event.type() == IntentEvent.Type.WITHDRAWN ||
+                         event.type() == IntentEvent.Type.FAILED)) {
                     latch.countDown();
                 }
             };
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
index f1776a9..1bfa0dc 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
@@ -17,6 +17,7 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.store.Timestamp;
 
 import java.util.List;
@@ -32,6 +33,7 @@
 
     private IntentState state;
     private Timestamp version;
+    private NodeId origin;
 
     private List<Intent> installables;
 
@@ -61,6 +63,19 @@
         return version;
     }
 
+    /**
+     * Sets the origin, which is the node that created the instance.
+     *
+     * @param origin origin instance
+     */
+    public void setOrigin(NodeId origin) {
+        this.origin = origin;
+    }
+
+    public NodeId origin() {
+        return origin;
+    }
+
     public void setState(IntentState newState) {
         this.state = newState;
     }
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index c33d2ea..cbf2398 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -16,11 +16,9 @@
 package org.onosproject.store.cluster.messaging;
 
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.onosproject.cluster.NodeId;
 
 import java.io.IOException;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 // TODO: remove IOExceptions?
@@ -51,9 +49,8 @@
      * @param message  message to send
      * @param toNodeId node identifier
      * @return true if the message was sent successfully; false otherwise.
-     * @throws IOException when I/O exception of some sort has occurred
      */
-    boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException;
+    boolean unicast(ClusterMessage message, NodeId toNodeId);
 
     /**
      * Multicast a message to a set of controller nodes.
@@ -62,7 +59,7 @@
      * @param nodeIds  recipient node identifiers
      * @return true if the message was sent successfully to all nodes in the group; false otherwise.
      */
-    boolean multicast(ClusterMessage message, Set<NodeId> nodeIds);
+    boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
 
     /**
      * Sends a message synchronously.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 59671c0..42a79b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -39,7 +38,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -107,7 +105,7 @@
     }
 
     @Override
-    public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
+    public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
         boolean ok = true;
         final ControllerNode localNode = clusterService.getLocalNode();
         byte[] payload = message.getBytes();
@@ -120,8 +118,8 @@
     }
 
     @Override
-    public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
-        return unicast(message.subject(), message.getBytes(), toNodeId);
+    public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+        return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
     }
 
     private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -137,7 +135,6 @@
         }
     }
 
-
     private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         try {
             return unicast(subject, payload, toNodeId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 56d01a0..06a60d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -305,14 +304,13 @@
             ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
                     SERIALIZER.encode(deviceInjectedEvent));
 
-            try {
-                clusterCommunicator.unicast(clusterMessage, deviceNode);
-            } catch (IOException e) {
-                log.warn("Failed to process injected device id: {} desc: {} " +
-                                "(cluster messaging failed: {})",
-                        deviceId, deviceDescription, e);
-            }
-
+            // TODO check unicast return value
+            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            /* error log:
+            log.warn("Failed to process injected device id: {} desc: {} " +
+                            "(cluster messaging failed: {})",
+                    deviceId, deviceDescription, e);
+            */
         }
 
         return deviceEvent;
@@ -556,13 +554,14 @@
             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
             ClusterMessage clusterMessage = new ClusterMessage(
                     localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
-            try {
-                clusterCommunicator.unicast(clusterMessage, deviceNode);
-            } catch (IOException e) {
-                log.warn("Failed to process injected ports of device id: {} " +
-                                "(cluster messaging failed: {})",
-                        deviceId, e);
-            }
+
+            //TODO check unicast return value
+            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            /* error log:
+            log.warn("Failed to process injected ports of device id: {} " +
+                            "(cluster messaging failed: {})",
+                    deviceId, e);
+            */
         }
 
         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
@@ -842,13 +841,13 @@
                      DEVICE_REMOVE_REQ,
                      SERIALIZER.encode(deviceId));
 
-             try {
-                 clusterCommunicator.unicast(message, master);
-             } catch (IOException e) {
-                 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
-             }
+            // TODO check unicast return value
+            clusterCommunicator.unicast(message, master);
+             /* error log:
+             log.error("Failed to forward {} remove request to {}", deviceId, master, e);
+             */
 
-             // event will be triggered after master processes it.
+            // event will be triggered after master processes it.
              return null;
         }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -51,6 +50,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
     private final ExecutorService executor;
 
     private final ScheduledExecutorService backgroundExecutor;
+    private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
 
     private ExecutorService broadcastMessageExecutor;
 
@@ -140,14 +141,18 @@
      *                            both K and V
      * @param clockService        a clock service able to generate timestamps
      *                            for K
+     * @param peerUpdateFunction  function that provides a set of nodes to immediately
+     *                            update to when there writes to the map
      */
     public EventuallyConsistentMapImpl(String mapName,
                                        ClusterService clusterService,
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
+                                       ClockService<K, V> clockService,
+                                       BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
+        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
 
         serializer = createSerializer(checkNotNull(serializerBuilder));
         destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
                                           new InternalAntiEntropyListener(), backgroundExecutor);
     }
 
+    /**
+     * Creates a new eventually consistent map shared amongst multiple instances.
+     * <p>
+     * Take a look at the other constructor for usage information. The only difference
+     * is that a BiFunction is provided that returns all nodes in the cluster, so
+     * all nodes will be sent write updates immediately.
+     * </p>
+     *
+     * @param mapName             a String identifier for the map.
+     * @param clusterService      the cluster service
+     * @param clusterCommunicator the cluster communications service
+     * @param serializerBuilder   a Kryo namespace builder that can serialize
+     *                            both K and V
+     * @param clockService        a clock service able to generate timestamps
+     *                            for K
+     */
+    public EventuallyConsistentMapImpl(String mapName,
+                                       ClusterService clusterService,
+                                       ClusterCommunicationService clusterCommunicator,
+                                       KryoNamespace.Builder serializerBuilder,
+                                       ClockService<K, V> clockService) {
+        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+             (key, value) -> clusterService.getNodes().stream()
+                     .map(ControllerNode::id)
+                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                     .collect(Collectors.toList()));
+    }
+
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
         return new KryoSerializer() {
             @Override
@@ -270,11 +303,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (putInternal(key, value, timestamp)) {
-            notifyPeers(new InternalPutEvent<>(key, value, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.PUT, key, value));
         }
     }
 
@@ -318,11 +350,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, null);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, null);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, null));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, null));
         }
     }
 
@@ -364,11 +395,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, value));
         }
     }
 
@@ -393,7 +423,7 @@
         }
 
         if (!updates.isEmpty()) {
-            notifyPeers(new InternalPutEvent<>(updates));
+            broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
 
             for (PutEntry<K, V> entry : updates) {
                 EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
         }
 
         if (!removed.isEmpty()) {
-            notifyPeers(new InternalRemoveEvent<>(removed));
+            broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
 
             for (RemoveEntry<K> entry : removed) {
                 EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
         }
     }
 
-    private void notifyPeers(InternalPutEvent event) {
+    private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
-        broadcastMessage(updateMessageSubject, event);
+        multicastMessage(updateMessageSubject, event, peers);
     }
 
-    private void notifyPeers(InternalRemoveEvent event) {
+    private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
-        broadcastMessage(removeMessageSubject, event);
+        multicastMessage(removeMessageSubject, event, peers);
+    }
+
+    private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+        // FIXME can we parallelize the serialization... use the caller???
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                serializer.encode(event));
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+//        clusterCommunicator.broadcast(message);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
 //        clusterCommunicator.broadcast(message);
     }
 
-    private void unicastMessage(NodeId peer,
-                                MessageSubject subject,
-                                Object event) throws IOException {
+    private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        clusterCommunicator.unicast(message, peer);
+//        clusterCommunicator.unicast(message, peer);
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
 
     private boolean underHighLoad() {
@@ -567,11 +606,9 @@
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
 
-                try {
-                    unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
-                } catch (IOException e) {
-                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
-                }
+                // TODO check the return value?
+                unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+                // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    try {
-                        unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
-                    } catch (IOException e) {
-                        log.debug(
-                                "Failed to send reactive anti-entropy advertisement to {}",
-                                sender);
-                    }
-
+                    // TODO check the return value?
+                    unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+                    // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
                     break;
                 }
             }
@@ -669,12 +701,10 @@
 
         // Send all updates to the peer at once
         if (!updatesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, updateMessageSubject,
-                               new InternalPutEvent<>(updatesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value?
+            unicastMessage(sender, updateMessageSubject,
+                           new InternalPutEvent<>(updatesToSend));
+            //error log: log.warn("Failed to send advertisement response", e);
         }
 
         return externalEvents;
@@ -707,12 +737,10 @@
 
         // Send all removes to the peer at once
         if (!removesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, removeMessageSubject,
-                               new InternalRemoveEvent<>(removesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value
+            unicastMessage(sender, removeMessageSubject,
+                           new InternalRemoveEvent<>(removesToSend));
+            // error log: log.warn("Failed to send advertisement response", e);
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 80f2914..450bb3d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -29,6 +29,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
@@ -138,7 +139,8 @@
     private ExecutorService messageHandlingExecutor;
 
     private final ExecutorService backupExecutors =
-            Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+            BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+            //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
 
     private boolean syncBackup = false;
 
@@ -385,12 +387,8 @@
                 SERIALIZER.encode(operation));
 
 
-        try {
-
-            clusterCommunicator.unicast(message, replicaInfo.master().get());
-
-        } catch (IOException e) {
-            log.warn("Failed to storeBatch: {}", e.getMessage());
+        if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
+            log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
                     .map(op -> op.target())
@@ -401,7 +399,6 @@
                     new CompletedBatchOperation(false, allFailures, deviceId)));
             return;
         }
-
     }
 
     private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -576,15 +573,13 @@
         if (nodeId == null) {
             notifyDelegate(event);
         } else {
-            try {
-                ClusterMessage message = new ClusterMessage(
-                        clusterService.getLocalNode().id(),
-                        REMOTE_APPLY_COMPLETED,
-                        SERIALIZER.encode(event));
-                clusterCommunicator.unicast(message, nodeId);
-            } catch (IOException e) {
-                log.warn("Failed to respond to peer for batch operation result");
-            }
+            ClusterMessage message = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    REMOTE_APPLY_COMPLETED,
+                    SERIALIZER.encode(event));
+            // TODO check unicast return value
+            clusterCommunicator.unicast(message, nodeId);
+            //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.intent.impl;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
  * Manages inventory of Intents in a distributed data store that uses optimistic
  * replication and gossip based techniques.
  */
+//FIXME we should listen for leadership changes. if the local instance has just
+// ...  become a leader, scan the pending map and process those
 @Component(immediate = false, enabled = true)
 @Service
 public class GossipIntentStore
@@ -86,15 +94,17 @@
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer,
-                                                       new IntentDataLogicalClockManager<>());
+                                                       new IntentDataLogicalClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
         pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer, // TODO
-                                                       new IntentDataClockManager<>());
+                                                       new IntentDataClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
-        currentMap.addListener(new InternalIntentStatesListener());
+        currentMap.addListener(new InternalCurrentListener());
         pendingMap.addListener(new InternalPendingListener());
 
         log.info("Started");
@@ -226,7 +236,6 @@
     @Override
     public void write(IntentData newData) {
         IntentData currentData = currentMap.get(newData.key());
-
         if (isUpdateAcceptable(currentData, newData)) {
             // Only the master is modifying the current state. Therefore assume
             // this always succeeds
@@ -239,6 +248,34 @@
         }
     }
 
+    private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+        NodeId master = partitionService.getLeader(key);
+        NodeId origin = (data != null) ? data.origin() : null;
+        NodeId me = clusterService.getLocalNode().id();
+        boolean isMaster = Objects.equals(master, me);
+        boolean isOrigin = Objects.equals(origin, me);
+        if (isMaster && isOrigin) {
+            return ImmutableList.of(getRandomNode());
+        } else if (isMaster) {
+            return ImmutableList.of(origin);
+        } else if (isOrigin) {
+            return ImmutableList.of(master);
+        } else {
+            // FIXME: why are we here? log error?
+            return ImmutableList.of(master);
+        }
+    }
+
+    private NodeId getRandomNode() {
+        List<NodeId> nodes = clusterService.getNodes().stream()
+                                .map(ControllerNode::id)
+                .collect(Collectors.toCollection(ArrayList::new));
+        Collections.shuffle(nodes);
+        // FIXME check if self
+        // FIXME verify nodes.size() > 0
+        return nodes.get(0);
+    }
+
     @Override
     public void batchWrite(Iterable<IntentData> updates) {
         updates.forEach(this::write);
@@ -263,6 +300,7 @@
         if (data.version() == null) {
             data.setVersion(new WallClockTimestamp());
         }
+        data.setOrigin(clusterService.getLocalNode().id());
         pendingMap.put(data.key(), copyData(data));
     }
 
@@ -292,7 +330,7 @@
         }
     }
 
-    private final class InternalIntentStatesListener implements
+    private final class InternalCurrentListener implements
             EventuallyConsistentMapListener<Key, IntentData> {
         @Override
         public void event(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 07b89fb..5fa26a7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -29,13 +29,13 @@
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Key;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
-
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -109,8 +109,13 @@
 
     @Override
     public boolean isMine(Key intentKey) {
-        return Objects.equal(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
-                             clusterService.getLocalNode().id());
+        return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
+                              clusterService.getLocalNode().id());
+    }
+
+    @Override
+    public NodeId getLeader(Key intentKey) {
+        return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
     private void doRelinquish() {
@@ -171,7 +176,7 @@
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
 
-            if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
+            if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
                 // See if we need to let some partitions go
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
index eaeabab1..2ee4434 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.intent.impl;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Key;
 
 /**
@@ -31,5 +32,13 @@
      */
     boolean isMine(Key intentKey);
 
+    /**
+     * Returns the leader for a particular key.
+     *
+     * @param intentKey intent key to query
+     * @return the leader node
+     */
+    NodeId getLeader(Key intentKey);
+
     // TODO add API for rebalancing partitions
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index d555069..75cb96c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -337,13 +337,13 @@
             ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
                     GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
 
-            try {
-                clusterCommunicator.unicast(linkInjectedMessage, dstNode);
-            } catch (IOException e) {
-                log.warn("Failed to process link update between src: {} and dst: {} " +
-                                "(cluster messaging failed: {})",
-                        linkDescription.src(), linkDescription.dst(), e);
-            }
+            // TODO check unicast return value
+            clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+            /* error log:
+            log.warn("Failed to process link update between src: {} and dst: {} " +
+                            "(cluster messaging failed: {})",
+                    linkDescription.src(), linkDescription.dst(), e);
+            */
 
         }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index eb0c38a..45b540d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,19 +15,13 @@
  */
 package org.onosproject.store.packet.impl;
 
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
@@ -43,9 +37,14 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * Distributed packet store implementation allowing packets to be sent to
  * remote instances.
@@ -118,12 +117,10 @@
             return;
         }
 
-        try {
-            communicationService.unicast(new ClusterMessage(
-                    myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
-        } catch (IOException e) {
-            log.warn("Failed to send packet-out to {}", master);
-        }
+        // TODO check unicast return value
+        communicationService.unicast(new ClusterMessage(
+                myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
+        // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
     /**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index e6670de..5ed6384 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,9 +16,9 @@
 package org.onosproject.store.ecmap;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,10 +53,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static junit.framework.TestCase.assertFalse;
 import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Unit tests for EventuallyConsistentMapImpl.
@@ -119,8 +116,8 @@
     @Before
     public void setUp() throws Exception {
         clusterService = createMock(ClusterService.class);
-        expect(clusterService.getLocalNode()).andReturn(self)
-                .anyTimes();
+        expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
+        expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
         replay(clusterService);
 
         clusterCommunicator = createMock(ClusterCommunicationService.class);
@@ -163,7 +160,7 @@
 
     @Test
     public void testSize() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertEquals(0, ecMap.size());
         ecMap.put(KEY1, VALUE1);
@@ -184,7 +181,7 @@
 
     @Test
     public void testIsEmpty() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.isEmpty());
         ecMap.put(KEY1, VALUE1);
@@ -195,7 +192,7 @@
 
     @Test
     public void testContainsKey() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertFalse(ecMap.containsKey(KEY1));
         ecMap.put(KEY1, VALUE1);
@@ -207,7 +204,7 @@
 
     @Test
     public void testContainsValue() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertFalse(ecMap.containsValue(VALUE1));
         ecMap.put(KEY1, VALUE1);
@@ -222,7 +219,7 @@
 
     @Test
     public void testGet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         CountDownLatch latch;
 
@@ -278,7 +275,7 @@
         ecMap.addListener(listener);
 
         // Set up expected internal message to be broadcast to peers on first put
-        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
+        expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
                 .peekAtNextTimestamp()), clusterCommunicator);
 
         // Put first value
@@ -289,7 +286,7 @@
         verify(clusterCommunicator);
 
         // Set up expected internal message to be broadcast to peers on second put
-        expectSpecificMessage(generatePutMessage(
+        expectSpecificMulticastMessage(generatePutMessage(
                 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
 
         // Update same key to a new value
@@ -332,14 +329,14 @@
         ecMap.addListener(listener);
 
         // Put in an initial value
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
         ecMap.put(KEY1, VALUE1);
         assertEquals(VALUE1, ecMap.get(KEY1));
 
         // Remove the value and check the correct internal cluster messages
         // are sent
-        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                              clusterCommunicator);
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                                       clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -349,8 +346,8 @@
         // Remove the same value again. Even though the value is no longer in
         // the map, we expect that the tombstone is updated and another remove
         // event is sent to the cluster and external listeners.
-        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                              clusterCommunicator);
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                                       clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -359,7 +356,7 @@
 
 
         // Put in a new value for us to try and remove
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         ecMap.put(KEY2, VALUE2);
 
@@ -400,8 +397,8 @@
         ecMap.addListener(listener);
 
         // Expect a multi-update inter-instance message
-        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
-                              clusterCommunicator);
+        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+                                       clusterCommunicator);
 
         Map<String, String> putAllValues = new HashMap<>();
         putAllValues.put(KEY1, VALUE1);
@@ -434,12 +431,12 @@
         verify(clusterCommunicator);
 
         // Put some items in the map
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
         ecMap.put(KEY1, VALUE1);
         ecMap.put(KEY2, VALUE2);
 
         ecMap.addListener(listener);
-        expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
 
         ecMap.clear();
 
@@ -449,7 +446,7 @@
 
     @Test
     public void testKeySet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.keySet().isEmpty());
 
@@ -482,7 +479,7 @@
 
     @Test
     public void testValues() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.values().isEmpty());
 
@@ -520,7 +517,7 @@
 
     @Test
     public void testEntrySet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.entrySet().isEmpty());
 
@@ -658,21 +655,52 @@
      * @param m message we expect to be sent
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    private static void expectSpecificMessage(ClusterMessage m,
-            ClusterCommunicationService clusterCommunicator) {
+    private static void expectSpecificBroadcastMessage(ClusterMessage m,
+                           ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
         expect(clusterCommunicator.broadcast(m)).andReturn(true);
         replay(clusterCommunicator);
     }
 
     /**
-     * Sets up a mock ClusterCommunicationService to expect any cluster message
+     * Sets up a mock ClusterCommunicationService to expect a specific cluster
+     * message to be multicast to the cluster.
+     *
+     * @param m message we expect to be sent
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private static void expectSpecificMulticastMessage(ClusterMessage m,
+                           ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+        replay(clusterCommunicator);
+    }
+
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
      * that is sent to it. This is useful for unit tests where we aren't
      * interested in testing the messaging component.
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
+    private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+                                             anyObject(Iterable.class)))
+                .andReturn(true)
+                .anyTimes();
+        replay(clusterCommunicator);
+    }
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
+     * that is sent to it. This is useful for unit tests where we aren't
+     * interested in testing the messaging component.
+     *
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
         expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
                 .andReturn(true)
@@ -700,13 +728,12 @@
         }
 
         @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId)
-                throws IOException {
+        public boolean unicast(ClusterMessage message, NodeId toNodeId)  {
             return false;
         }
 
         @Override
-        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
+        public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
             return false;
         }
 
diff --git a/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java b/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
index 087b692..2d8b688 100644
--- a/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/BlockingBooleanTest.java
@@ -31,6 +31,8 @@
  */
 public class BlockingBooleanTest  {
 
+    private static final int TIMEOUT = 100; //ms
+
     @Test
     public void basics() {
         BlockingBoolean b = new BlockingBoolean(false);
@@ -60,7 +62,7 @@
         }
         b.set(value);
         try {
-            assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+            assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
         } catch (InterruptedException e) {
             fail();
         }
@@ -92,7 +94,7 @@
             }
         });
         try {
-            assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+            assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
         } catch (InterruptedException e) {
             fail();
         }
@@ -124,14 +126,14 @@
             });
         }
         try {
-            assertTrue(sameLatch.await(10, TimeUnit.MILLISECONDS));
+            assertTrue(sameLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
             assertEquals(waitLatch.getCount(), numThreads / 2);
         } catch (InterruptedException e) {
             fail();
         }
         b.set(true);
         try {
-            assertTrue(waitLatch.await(10, TimeUnit.MILLISECONDS));
+            assertTrue(waitLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
         } catch (InterruptedException e) {
             fail();
         }
@@ -156,7 +158,7 @@
             }
         });
         try {
-            assertTrue(latch.await(10, TimeUnit.MILLISECONDS));
+            assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
         } catch (InterruptedException e) {
             fail();
         }