Towards a distributed flow rule store
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 67e0867..ba37d22 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,8 +5,10 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -30,7 +32,9 @@
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
 import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -47,6 +51,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Provides implementation of the flow NB & SB APIs.
@@ -104,11 +111,7 @@
     public void applyFlowRules(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
             FlowRule f = flowRules[i];
-            boolean local = store.storeFlowRule(f);
-            if (local) {
-                // TODO: aggregate all local rules and push down once?
-                applyFlowRulesToProviders(f);
-            }
+            store.storeFlowRule(f);
         }
     }
 
@@ -132,11 +135,7 @@
         FlowRule f;
         for (int i = 0; i < flowRules.length; i++) {
             f = flowRules[i];
-            boolean local = store.deleteFlowRule(f);
-            if (local) {
-                // TODO: aggregate all local rules and push down once?
-                removeFlowRulesFromProviders(f);
-            }
+            store.deleteFlowRule(f);
         }
     }
 
@@ -180,33 +179,21 @@
     @Override
     public Future<CompletedBatchOperation> applyBatch(
             FlowRuleBatchOperation batch) {
-        Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+        Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
                 ArrayListMultimap.create();
         List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             final FlowRule f = fbe.getTarget();
-            final Device device = deviceService.getDevice(f.deviceId());
-            final FlowRuleProvider frp = getProvider(device.providerId());
-            batches.put(frp, fbe);
-            switch (fbe.getOperator()) {
-                case ADD:
-                    store.storeFlowRule(f);
-                    break;
-                case REMOVE:
-                    store.deleteFlowRule(f);
-                    break;
-                case MODIFY:
-                default:
-                    log.error("Batch operation type {} unsupported.", fbe.getOperator());
-            }
+            perDeviceBatches.put(f.deviceId(), fbe);
         }
-        for (FlowRuleProvider provider : batches.keySet()) {
+
+        for (DeviceId deviceId : perDeviceBatches.keySet()) {
             FlowRuleBatchOperation b =
-                    new FlowRuleBatchOperation(batches.get(provider));
-            Future<CompletedBatchOperation> future = provider.executeBatch(b);
+                    new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
+            Future<CompletedBatchOperation> future = store.storeBatch(b);
             futures.add(future);
         }
-        return new FlowRuleBatchFuture(futures, batches);
+        return new FlowRuleBatchFuture(futures, perDeviceBatches);
     }
 
     @Override
@@ -318,6 +305,7 @@
                     post(event);
                 }
             } else {
+                log.info("Removing flow rules....");
                 removeFlowRules(flowEntry);
             }
 
@@ -385,21 +373,47 @@
 
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+        // TODO: Right now we only dispatch events at individual flowEntry level.
+        // It may be more efficient for also dispatch events as a batch.
         @Override
-        public void notify(FlowRuleEvent event) {
+        public void notify(FlowRuleBatchEvent event) {
+            final FlowRuleBatchRequest request = event.subject();
             switch (event.type()) {
-            case RULE_ADD_REQUESTED:
-                applyFlowRulesToProviders(event.subject());
-                break;
-            case RULE_REMOVE_REQUESTED:
-                removeFlowRulesFromProviders(event.subject());
-                break;
+            case BATCH_OPERATION_REQUESTED:
+//                for (FlowEntry entry : request.toAdd()) {
+//                    //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
+//                }
+//                for (FlowEntry entry : request.toRemove()) {
+//                    //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
+//                }
+//                // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+//
+                FlowRuleBatchOperation batchOperation = request.asBatchOperation();
 
-            case RULE_ADDED:
-            case RULE_REMOVED:
-            case RULE_UPDATED:
-                // only dispatch events related to switch
-                eventDispatcher.post(event);
+                FlowRuleProvider flowRuleProvider =
+                        getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
+                final ListenableFuture<CompletedBatchOperation> result =
+                        flowRuleProvider.executeBatch(batchOperation);
+                result.addListener(new Runnable() {
+                    @Override
+                    public void run() {
+                        store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+                    }
+                }, Executors.newCachedThreadPool());
+
+                break;
+            case BATCH_OPERATION_COMPLETED:
+                Set<FlowEntry> failedItems = event.result().failedItems();
+                for (FlowEntry entry : request.toAdd()) {
+                    if (!failedItems.contains(entry)) {
+                        eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
+                    }
+                }
+                for (FlowEntry entry : request.toRemove()) {
+                    if (!failedItems.contains(entry)) {
+                            eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
+                    }
+                }
                 break;
             default:
                 break;
@@ -407,18 +421,15 @@
         }
     }
 
-    private class FlowRuleBatchFuture
-        implements Future<CompletedBatchOperation> {
+    private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
 
         private final List<Future<CompletedBatchOperation>> futures;
-        private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+        private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
         private final AtomicReference<BatchState> state;
         private CompletedBatchOperation overall;
 
-
-
         public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
-                Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
+                Multimap<DeviceId, FlowRuleBatchEntry> batches) {
             this.futures = futures;
             this.batches = batches;
             state = new AtomicReference<FlowRuleManager.BatchState>();
@@ -460,7 +471,7 @@
             }
 
             boolean success = true;
-            List<FlowEntry> failed = Lists.newLinkedList();
+            Set<FlowEntry> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             for (Future<CompletedBatchOperation> future : futures) {
                 completed = future.get();
@@ -480,7 +491,7 @@
                 return overall;
             }
             boolean success = true;
-            List<FlowEntry> failed = Lists.newLinkedList();
+            Set<FlowEntry> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             long start = System.nanoTime();
             long end = start + unit.toNanos(timeout);
@@ -494,7 +505,7 @@
             return finalizeBatchOperation(success, failed);
         }
 
-        private boolean validateBatchOperation(List<FlowEntry> failed,
+        private boolean validateBatchOperation(Set<FlowEntry> failed,
                 CompletedBatchOperation completed) {
 
             if (isCancelled()) {
@@ -516,7 +527,7 @@
         }
 
         private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                List<FlowEntry> failed) {
+                Set<FlowEntry> failed) {
             synchronized (this) {
                 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
                     if (state.get() == BatchState.FINISHED) {
@@ -539,11 +550,6 @@
                     store.storeFlowRule(fbe.getTarget());
                 }
             }
-
         }
     }
-
-
-
-
 }