initial working impl of batch operations

Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index ce11cea..a9eddd8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,10 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -18,8 +22,11 @@
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -32,7 +39,9 @@
 import org.onlab.onos.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
 /**
  * Provides implementation of the flow NB & SB APIs.
@@ -131,6 +140,38 @@
     }
 
     @Override
+    public Future<CompletedBatchOperation> applyBatch(
+            FlowRuleBatchOperation batch) {
+        Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+                ArrayListMultimap.create();
+        List<Future<Void>> futures = Lists.newArrayList();
+        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+            final FlowRule f = fbe.getTarget();
+            final Device device = deviceService.getDevice(f.deviceId());
+            final FlowRuleProvider frp = getProvider(device.providerId());
+            batches.put(frp, fbe);
+            switch (fbe.getOperator()) {
+                case ADD:
+                    store.storeFlowRule(f);
+                    break;
+                case REMOVE:
+                    store.deleteFlowRule(f);
+                    break;
+                case MODIFY:
+                default:
+                    log.error("Batch operation type {} unsupported.", fbe.getOperator());
+            }
+        }
+        for (FlowRuleProvider provider : batches.keySet()) {
+            FlowRuleBatchOperation b =
+                    new FlowRuleBatchOperation(batches.get(provider));
+            Future<Void> future = provider.executeBatch(b);
+            futures.add(future);
+        }
+        return new FlowRuleBatchFuture(futures);
+    }
+
+    @Override
     public void addListener(FlowRuleListener listener) {
         listenerRegistry.addListener(listener);
     }
@@ -296,4 +337,63 @@
             eventDispatcher.post(event);
         }
     }
+
+    private class FlowRuleBatchFuture
+        implements Future<CompletedBatchOperation> {
+
+        private final List<Future<Void>> futures;
+
+        public FlowRuleBatchFuture(List<Future<Void>> futures) {
+            this.futures = futures;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            // TODO Auto-generated method stub
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            boolean isDone = true;
+            for (Future<Void> future : futures) {
+                isDone &= future.isDone();
+            }
+            return isDone;
+        }
+
+        @Override
+        public CompletedBatchOperation get() throws InterruptedException,
+        ExecutionException {
+            // TODO Auto-generated method stub
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+            return new CompletedBatchOperation();
+        }
+
+        @Override
+        public CompletedBatchOperation get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            // TODO we should decrement the timeout
+            long start = System.nanoTime();
+            long end = start + unit.toNanos(timeout);
+            for (Future<Void> future : futures) {
+                long now = System.nanoTime();
+                long thisTimeout = end - now;
+                future.get(thisTimeout, TimeUnit.NANOSECONDS);
+            }
+            return new CompletedBatchOperation();
+        }
+
+    }
+
+
 }