DistributedFlowRuleStore: support remote removeFlowRule needed for cancelling Batch

Change-Id: I40f8dd8c2008e93c5ac7393295374726f83353c7
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 6ab5e12..8126554 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -213,6 +213,21 @@
             }
         });
 
+        clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowEntry rule = SERIALIZER.decode(message.payload());
+                log.trace("received get flow entry request for {}", rule);
+                FlowRuleEvent event = removeFlowRuleInternal(rule);
+                try {
+                    message.respond(SERIALIZER.encode(event));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
         replicaInfoEventListener = new InternalReplicaInfoEventListener();
 
         replicaInfoManager.addListener(replicaInfoEventListener);
@@ -222,6 +237,10 @@
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
+        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+        clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
+        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
@@ -507,9 +526,21 @@
             return removeFlowRuleInternal(rule);
         }
 
-        log.warn("Tried to remove FlowRule {},"
-                + " while the Node was not the master.", rule);
-        return null;
+        log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), rule.deviceId());
+
+        ClusterMessage message = new ClusterMessage(
+                  clusterService.getLocalNode().id(),
+                  REMOVE_FLOW_ENTRY,
+                  SERIALIZER.encode(rule));
+
+        try {
+            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index a1cee06..bc60673 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -31,4 +31,7 @@
 
     public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
         = new MessageSubject("peer-forward-get-device-flow-entries");
+
+    public static final MessageSubject REMOVE_FLOW_ENTRY
+        = new MessageSubject("peer-forward-remove-flow-entry");
 }