Towards a distributed flow rule store
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index bde57c6..85f928a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -5,10 +5,14 @@
 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -19,11 +23,17 @@
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
 import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
@@ -43,6 +53,7 @@
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -50,7 +61,7 @@
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-        extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
         implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
@@ -92,7 +103,7 @@
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.info("received add request for {}", rule);
-                storeFlowEntryInternal(rule);
+                storeFlowRule(rule);
                 // FIXME what to respond.
                 try {
                     message.respond(SERIALIZER.encode("ACK"));
@@ -108,7 +119,7 @@
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.info("received delete request for {}", rule);
-                deleteFlowRuleInternal(rule);
+                deleteFlowRule(rule);
                 // FIXME what to respond.
                 try {
                     message.respond(SERIALIZER.encode("ACK"));
@@ -118,6 +129,22 @@
 
             }
         });
+
+        clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received get flow entry request for {}", rule);
+                FlowEntry flowEntry = getFlowEntryInternal(rule);
+                try {
+                    message.respond(SERIALIZER.encode(flowEntry));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
         log.info("Started");
     }
 
@@ -127,6 +154,9 @@
     }
 
 
+    // TODO: This is not a efficient operation on a distributed sharded
+    // flow store. We need to revisit the need for this operation or at least
+    // make it device specific.
     @Override
     public int getFlowRuleCount() {
         return flowEntries.size();
@@ -134,7 +164,26 @@
 
     @Override
     public synchronized FlowEntry getFlowEntry(FlowRule rule) {
-        return getFlowEntryInternal(rule);
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return getFlowEntryInternal(rule);
+        }
+
+        log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+                replicaInfo.master().orNull(), rule.deviceId());
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
     }
 
     private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
@@ -165,19 +214,30 @@
     }
 
     @Override
-    public boolean storeFlowRule(FlowRule rule) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return storeFlowEntryInternal(rule);
+    public void storeFlowRule(FlowRule rule) {
+        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+    }
+
+    public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+        if (operation.getOperations().isEmpty()) {
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
 
-        log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), rule.deviceId());
+        DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return storeBatchInternal(operation);
+        }
+
+        log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+                replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 FlowStoreMessageSubjects.STORE_FLOW_RULE,
-                SERIALIZER.encode(rule));
+                SERIALIZER.encode(operation));
 
         try {
             ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -186,58 +246,44 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
-        return false;
+
+        return null;
     }
 
-    private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
-        StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-        DeviceId deviceId = flowRule.deviceId();
-        // write to local copy.
-        if (!flowEntries.containsEntry(deviceId, flowEntry)) {
-            flowEntries.put(deviceId, flowEntry);
-            flowEntriesById.put(flowRule.appId(), flowEntry);
-            notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
-            return true;
+    private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+        List<FlowEntry> toRemove = new ArrayList<>();
+        List<FlowEntry> toAdd = new ArrayList<>();
+        // TODO: backup changes to hazelcast map
+        for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+            FlowRule flowRule = batchEntry.getTarget();
+            FlowRuleOperation op = batchEntry.getOperator();
+            if (op.equals(FlowRuleOperation.REMOVE)) {
+                StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+                if (entry != null) {
+                    entry.setState(FlowEntryState.PENDING_REMOVE);
+                }
+                toRemove.add(entry);
+            } else if (op.equals(FlowRuleOperation.ADD)) {
+                StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+                DeviceId deviceId = flowRule.deviceId();
+                if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+                    flowEntries.put(deviceId, flowEntry);
+                    flowEntriesById.put(flowRule.appId(), flowEntry);
+                    toAdd.add(flowEntry);
+                }
+            }
         }
-        // write to backup.
-        // TODO: write to a hazelcast map.
-        return false;
+        if (toAdd.isEmpty() && toRemove.isEmpty()) {
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+        }
+        notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
+        // TODO: imlpement this.
+        return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
     }
 
     @Override
-    public synchronized boolean deleteFlowRule(FlowRule rule) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return deleteFlowRuleInternal(rule);
-        }
-
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.DELETE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
-        return false;
-    }
-
-    private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
-        StoredFlowEntry entry = getFlowEntryInternal(flowRule);
-        if (entry == null) {
-            return false;
-        }
-        entry.setState(FlowEntryState.PENDING_REMOVE);
-
-        // TODO: also update backup.
-
-        notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
-
-        return true;
+    public void deleteFlowRule(FlowRule rule) {
+        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
     }
 
     @Override
@@ -315,4 +361,9 @@
         }
         // TODO: also update backup.
     }
+
+    @Override
+    public void batchOperationComplete(FlowRuleBatchEvent event) {
+        notifyDelegate(event);
+    }
 }