DistributedFlowRuleStore: support remote removeFlowRule needed for cancelling Batch
Change-Id: I40f8dd8c2008e93c5ac7393295374726f83353c7
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 6ab5e12..8126554 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -213,6 +213,21 @@
}
});
+ clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowEntry rule = SERIALIZER.decode(message.payload());
+ log.trace("received get flow entry request for {}", rule);
+ FlowRuleEvent event = removeFlowRuleInternal(rule);
+ try {
+ message.respond(SERIALIZER.encode(event));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+
replicaInfoEventListener = new InternalReplicaInfoEventListener();
replicaInfoManager.addListener(replicaInfoEventListener);
@@ -222,6 +237,10 @@
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
+ clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+ clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
+ clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -507,9 +526,21 @@
return removeFlowRuleInternal(rule);
}
- log.warn("Tried to remove FlowRule {},"
- + " while the Node was not the master.", rule);
- return null;
+ log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), rule.deviceId());
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER.encode(rule));
+
+ try {
+ Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index a1cee06..bc60673 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -31,4 +31,7 @@
public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
= new MessageSubject("peer-forward-get-device-flow-entries");
+
+ public static final MessageSubject REMOVE_FLOW_ENTRY
+ = new MessageSubject("peer-forward-remove-flow-entry");
}