getFlowEntries now operates correctly in a distributed setting
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index e5aa3e8..cac31c2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -11,6 +11,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -159,6 +160,21 @@
}
});
+ clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ DeviceId deviceId = SERIALIZER.decode(message.payload());
+ log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
+ Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+ try {
+ message.respond(SERIALIZER.encode(flowEntries));
+ } catch (IOException e) {
+ log.error("Failed to respond to peer's getFlowEntries request", e);
+ }
+ }
+ });
+
log.info("Started");
}
@@ -217,9 +233,33 @@
@Override
public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return getFlowEntriesInternal(deviceId);
+ }
+
+ log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), deviceId);
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GET_DEVICE_FLOW_ENTRIES,
+ SERIALIZER.encode(deviceId));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
- return Collections.emptyList();
+ return Collections.emptySet();
}
return ImmutableSet.copyOf(rules);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index ef68b55..8f4a050 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -13,4 +13,7 @@
public static final MessageSubject GET_FLOW_ENTRY
= new MessageSubject("peer-forward-get-flow-entry");
+
+ public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
+ = new MessageSubject("peer-forward-get-device-flow-entries");
}