Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 59671c0..42a79b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -39,7 +38,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -107,7 +105,7 @@
     }
 
     @Override
-    public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
+    public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
         boolean ok = true;
         final ControllerNode localNode = clusterService.getLocalNode();
         byte[] payload = message.getBytes();
@@ -120,8 +118,8 @@
     }
 
     @Override
-    public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
-        return unicast(message.subject(), message.getBytes(), toNodeId);
+    public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+        return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
     }
 
     private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -137,7 +135,6 @@
         }
     }
 
-
     private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         try {
             return unicast(subject, payload, toNodeId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 56d01a0..06a60d5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -305,14 +304,13 @@
             ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
                     SERIALIZER.encode(deviceInjectedEvent));
 
-            try {
-                clusterCommunicator.unicast(clusterMessage, deviceNode);
-            } catch (IOException e) {
-                log.warn("Failed to process injected device id: {} desc: {} " +
-                                "(cluster messaging failed: {})",
-                        deviceId, deviceDescription, e);
-            }
-
+            // TODO check unicast return value
+            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            /* error log:
+            log.warn("Failed to process injected device id: {} desc: {} " +
+                            "(cluster messaging failed: {})",
+                    deviceId, deviceDescription, e);
+            */
         }
 
         return deviceEvent;
@@ -556,13 +554,14 @@
             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
             ClusterMessage clusterMessage = new ClusterMessage(
                     localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
-            try {
-                clusterCommunicator.unicast(clusterMessage, deviceNode);
-            } catch (IOException e) {
-                log.warn("Failed to process injected ports of device id: {} " +
-                                "(cluster messaging failed: {})",
-                        deviceId, e);
-            }
+
+            //TODO check unicast return value
+            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            /* error log:
+            log.warn("Failed to process injected ports of device id: {} " +
+                            "(cluster messaging failed: {})",
+                    deviceId, e);
+            */
         }
 
         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
@@ -842,13 +841,13 @@
                      DEVICE_REMOVE_REQ,
                      SERIALIZER.encode(deviceId));
 
-             try {
-                 clusterCommunicator.unicast(message, master);
-             } catch (IOException e) {
-                 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
-             }
+            // TODO check unicast return value
+            clusterCommunicator.unicast(message, master);
+             /* error log:
+             log.error("Failed to forward {} remove request to {}", deviceId, master, e);
+             */
 
-             // event will be triggered after master processes it.
+            // event will be triggered after master processes it.
              return null;
         }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -51,6 +50,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
     private final ExecutorService executor;
 
     private final ScheduledExecutorService backgroundExecutor;
+    private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
 
     private ExecutorService broadcastMessageExecutor;
 
@@ -140,14 +141,18 @@
      *                            both K and V
      * @param clockService        a clock service able to generate timestamps
      *                            for K
+     * @param peerUpdateFunction  function that provides a set of nodes to immediately
+     *                            update to when there writes to the map
      */
     public EventuallyConsistentMapImpl(String mapName,
                                        ClusterService clusterService,
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
+                                       ClockService<K, V> clockService,
+                                       BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
+        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
 
         serializer = createSerializer(checkNotNull(serializerBuilder));
         destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
                                           new InternalAntiEntropyListener(), backgroundExecutor);
     }
 
+    /**
+     * Creates a new eventually consistent map shared amongst multiple instances.
+     * <p>
+     * Take a look at the other constructor for usage information. The only difference
+     * is that a BiFunction is provided that returns all nodes in the cluster, so
+     * all nodes will be sent write updates immediately.
+     * </p>
+     *
+     * @param mapName             a String identifier for the map.
+     * @param clusterService      the cluster service
+     * @param clusterCommunicator the cluster communications service
+     * @param serializerBuilder   a Kryo namespace builder that can serialize
+     *                            both K and V
+     * @param clockService        a clock service able to generate timestamps
+     *                            for K
+     */
+    public EventuallyConsistentMapImpl(String mapName,
+                                       ClusterService clusterService,
+                                       ClusterCommunicationService clusterCommunicator,
+                                       KryoNamespace.Builder serializerBuilder,
+                                       ClockService<K, V> clockService) {
+        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+             (key, value) -> clusterService.getNodes().stream()
+                     .map(ControllerNode::id)
+                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                     .collect(Collectors.toList()));
+    }
+
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
         return new KryoSerializer() {
             @Override
@@ -270,11 +303,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (putInternal(key, value, timestamp)) {
-            notifyPeers(new InternalPutEvent<>(key, value, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.PUT, key, value));
         }
     }
 
@@ -318,11 +350,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, null);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, null);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, null));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, null));
         }
     }
 
@@ -364,11 +395,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, value));
         }
     }
 
@@ -393,7 +423,7 @@
         }
 
         if (!updates.isEmpty()) {
-            notifyPeers(new InternalPutEvent<>(updates));
+            broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
 
             for (PutEntry<K, V> entry : updates) {
                 EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
         }
 
         if (!removed.isEmpty()) {
-            notifyPeers(new InternalRemoveEvent<>(removed));
+            broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
 
             for (RemoveEntry<K> entry : removed) {
                 EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
         }
     }
 
-    private void notifyPeers(InternalPutEvent event) {
+    private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
-        broadcastMessage(updateMessageSubject, event);
+        multicastMessage(updateMessageSubject, event, peers);
     }
 
-    private void notifyPeers(InternalRemoveEvent event) {
+    private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
-        broadcastMessage(removeMessageSubject, event);
+        multicastMessage(removeMessageSubject, event, peers);
+    }
+
+    private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+        // FIXME can we parallelize the serialization... use the caller???
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                serializer.encode(event));
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+//        clusterCommunicator.broadcast(message);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
 //        clusterCommunicator.broadcast(message);
     }
 
-    private void unicastMessage(NodeId peer,
-                                MessageSubject subject,
-                                Object event) throws IOException {
+    private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        clusterCommunicator.unicast(message, peer);
+//        clusterCommunicator.unicast(message, peer);
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
 
     private boolean underHighLoad() {
@@ -567,11 +606,9 @@
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
 
-                try {
-                    unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
-                } catch (IOException e) {
-                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
-                }
+                // TODO check the return value?
+                unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+                // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    try {
-                        unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
-                    } catch (IOException e) {
-                        log.debug(
-                                "Failed to send reactive anti-entropy advertisement to {}",
-                                sender);
-                    }
-
+                    // TODO check the return value?
+                    unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+                    // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
                     break;
                 }
             }
@@ -669,12 +701,10 @@
 
         // Send all updates to the peer at once
         if (!updatesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, updateMessageSubject,
-                               new InternalPutEvent<>(updatesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value?
+            unicastMessage(sender, updateMessageSubject,
+                           new InternalPutEvent<>(updatesToSend));
+            //error log: log.warn("Failed to send advertisement response", e);
         }
 
         return externalEvents;
@@ -707,12 +737,10 @@
 
         // Send all removes to the peer at once
         if (!removesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, removeMessageSubject,
-                               new InternalRemoveEvent<>(removesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value
+            unicastMessage(sender, removeMessageSubject,
+                           new InternalRemoveEvent<>(removesToSend));
+            // error log: log.warn("Failed to send advertisement response", e);
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 80f2914..450bb3d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -29,6 +29,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
@@ -138,7 +139,8 @@
     private ExecutorService messageHandlingExecutor;
 
     private final ExecutorService backupExecutors =
-            Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+            BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+            //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
 
     private boolean syncBackup = false;
 
@@ -385,12 +387,8 @@
                 SERIALIZER.encode(operation));
 
 
-        try {
-
-            clusterCommunicator.unicast(message, replicaInfo.master().get());
-
-        } catch (IOException e) {
-            log.warn("Failed to storeBatch: {}", e.getMessage());
+        if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
+            log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
                     .map(op -> op.target())
@@ -401,7 +399,6 @@
                     new CompletedBatchOperation(false, allFailures, deviceId)));
             return;
         }
-
     }
 
     private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -576,15 +573,13 @@
         if (nodeId == null) {
             notifyDelegate(event);
         } else {
-            try {
-                ClusterMessage message = new ClusterMessage(
-                        clusterService.getLocalNode().id(),
-                        REMOTE_APPLY_COMPLETED,
-                        SERIALIZER.encode(event));
-                clusterCommunicator.unicast(message, nodeId);
-            } catch (IOException e) {
-                log.warn("Failed to respond to peer for batch operation result");
-            }
+            ClusterMessage message = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    REMOTE_APPLY_COMPLETED,
+                    SERIALIZER.encode(event));
+            // TODO check unicast return value
+            clusterCommunicator.unicast(message, nodeId);
+            //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.intent.impl;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
  * Manages inventory of Intents in a distributed data store that uses optimistic
  * replication and gossip based techniques.
  */
+//FIXME we should listen for leadership changes. if the local instance has just
+// ...  become a leader, scan the pending map and process those
 @Component(immediate = false, enabled = true)
 @Service
 public class GossipIntentStore
@@ -86,15 +94,17 @@
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer,
-                                                       new IntentDataLogicalClockManager<>());
+                                                       new IntentDataLogicalClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
         pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer, // TODO
-                                                       new IntentDataClockManager<>());
+                                                       new IntentDataClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
-        currentMap.addListener(new InternalIntentStatesListener());
+        currentMap.addListener(new InternalCurrentListener());
         pendingMap.addListener(new InternalPendingListener());
 
         log.info("Started");
@@ -226,7 +236,6 @@
     @Override
     public void write(IntentData newData) {
         IntentData currentData = currentMap.get(newData.key());
-
         if (isUpdateAcceptable(currentData, newData)) {
             // Only the master is modifying the current state. Therefore assume
             // this always succeeds
@@ -239,6 +248,34 @@
         }
     }
 
+    private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+        NodeId master = partitionService.getLeader(key);
+        NodeId origin = (data != null) ? data.origin() : null;
+        NodeId me = clusterService.getLocalNode().id();
+        boolean isMaster = Objects.equals(master, me);
+        boolean isOrigin = Objects.equals(origin, me);
+        if (isMaster && isOrigin) {
+            return ImmutableList.of(getRandomNode());
+        } else if (isMaster) {
+            return ImmutableList.of(origin);
+        } else if (isOrigin) {
+            return ImmutableList.of(master);
+        } else {
+            // FIXME: why are we here? log error?
+            return ImmutableList.of(master);
+        }
+    }
+
+    private NodeId getRandomNode() {
+        List<NodeId> nodes = clusterService.getNodes().stream()
+                                .map(ControllerNode::id)
+                .collect(Collectors.toCollection(ArrayList::new));
+        Collections.shuffle(nodes);
+        // FIXME check if self
+        // FIXME verify nodes.size() > 0
+        return nodes.get(0);
+    }
+
     @Override
     public void batchWrite(Iterable<IntentData> updates) {
         updates.forEach(this::write);
@@ -263,6 +300,7 @@
         if (data.version() == null) {
             data.setVersion(new WallClockTimestamp());
         }
+        data.setOrigin(clusterService.getLocalNode().id());
         pendingMap.put(data.key(), copyData(data));
     }
 
@@ -292,7 +330,7 @@
         }
     }
 
-    private final class InternalIntentStatesListener implements
+    private final class InternalCurrentListener implements
             EventuallyConsistentMapListener<Key, IntentData> {
         @Override
         public void event(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 07b89fb..5fa26a7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -29,13 +29,13 @@
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Key;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
-
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -109,8 +109,13 @@
 
     @Override
     public boolean isMine(Key intentKey) {
-        return Objects.equal(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
-                             clusterService.getLocalNode().id());
+        return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
+                              clusterService.getLocalNode().id());
+    }
+
+    @Override
+    public NodeId getLeader(Key intentKey) {
+        return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
     private void doRelinquish() {
@@ -171,7 +176,7 @@
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
 
-            if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
+            if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
                 // See if we need to let some partitions go
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
index eaeabab1..2ee4434 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.intent.impl;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Key;
 
 /**
@@ -31,5 +32,13 @@
      */
     boolean isMine(Key intentKey);
 
+    /**
+     * Returns the leader for a particular key.
+     *
+     * @param intentKey intent key to query
+     * @return the leader node
+     */
+    NodeId getLeader(Key intentKey);
+
     // TODO add API for rebalancing partitions
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index d555069..75cb96c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -337,13 +337,13 @@
             ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
                     GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
 
-            try {
-                clusterCommunicator.unicast(linkInjectedMessage, dstNode);
-            } catch (IOException e) {
-                log.warn("Failed to process link update between src: {} and dst: {} " +
-                                "(cluster messaging failed: {})",
-                        linkDescription.src(), linkDescription.dst(), e);
-            }
+            // TODO check unicast return value
+            clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+            /* error log:
+            log.warn("Failed to process link update between src: {} and dst: {} " +
+                            "(cluster messaging failed: {})",
+                    linkDescription.src(), linkDescription.dst(), e);
+            */
 
         }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index eb0c38a..45b540d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,19 +15,13 @@
  */
 package org.onosproject.store.packet.impl;
 
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
@@ -43,9 +37,14 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * Distributed packet store implementation allowing packets to be sent to
  * remote instances.
@@ -118,12 +117,10 @@
             return;
         }
 
-        try {
-            communicationService.unicast(new ClusterMessage(
-                    myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
-        } catch (IOException e) {
-            log.warn("Failed to send packet-out to {}", master);
-        }
+        // TODO check unicast return value
+        communicationService.unicast(new ClusterMessage(
+                myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
+        // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
     /**