getFlowEntries now operates correctly in a distributed setting
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index e5aa3e8..cac31c2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -11,6 +11,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -159,6 +160,21 @@
             }
         });
 
+        clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                DeviceId deviceId = SERIALIZER.decode(message.payload());
+                log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
+                Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+                try {
+                    message.respond(SERIALIZER.encode(flowEntries));
+                } catch (IOException e) {
+                    log.error("Failed to respond to peer's getFlowEntries request", e);
+                }
+            }
+        });
+
         log.info("Started");
     }
 
@@ -217,9 +233,33 @@
 
     @Override
     public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return getFlowEntriesInternal(deviceId);
+        }
+
+        log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                replicaInfo.master().orNull(), deviceId);
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GET_DEVICE_FLOW_ENTRIES,
+                SERIALIZER.encode(deviceId));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
         Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
         if (rules == null) {
-            return Collections.emptyList();
+            return Collections.emptySet();
         }
         return ImmutableSet.copyOf(rules);
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index ef68b55..8f4a050 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -13,4 +13,7 @@
 
     public static final MessageSubject GET_FLOW_ENTRY
         = new MessageSubject("peer-forward-get-flow-entry");
+
+    public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
+        = new MessageSubject("peer-forward-get-device-flow-entries");
 }