clean batch operations

Change-Id: I7187de40bb5276d6ae9e9831e5d47d36e16560ad
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a9eddd8..a897cbb 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,10 +5,12 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -26,6 +28,7 @@
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onlab.onos.net.flow.FlowRuleBatchOperation;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
@@ -52,6 +55,8 @@
         extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
         implements FlowRuleService, FlowRuleProviderRegistry {
 
+    enum BatchState { STARTED, FINISHED, CANCELLED };
+
     public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
     private final Logger log = getLogger(getClass());
 
@@ -144,7 +149,7 @@
             FlowRuleBatchOperation batch) {
         Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
                 ArrayListMultimap.create();
-        List<Future<Void>> futures = Lists.newArrayList();
+        List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             final FlowRule f = fbe.getTarget();
             final Device device = deviceService.getDevice(f.deviceId());
@@ -165,10 +170,10 @@
         for (FlowRuleProvider provider : batches.keySet()) {
             FlowRuleBatchOperation b =
                     new FlowRuleBatchOperation(batches.get(provider));
-            Future<Void> future = provider.executeBatch(b);
+            Future<CompletedBatchOperation> future = provider.executeBatch(b);
             futures.add(future);
         }
-        return new FlowRuleBatchFuture(futures);
+        return new FlowRuleBatchFuture(futures, batches);
     }
 
     @Override
@@ -341,59 +346,140 @@
     private class FlowRuleBatchFuture
         implements Future<CompletedBatchOperation> {
 
-        private final List<Future<Void>> futures;
+        private final List<Future<CompletedBatchOperation>> futures;
+        private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+        private final AtomicReference<BatchState> state;
+        private CompletedBatchOperation overall;
 
-        public FlowRuleBatchFuture(List<Future<Void>> futures) {
+
+
+        public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
+                Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
             this.futures = futures;
+            this.batches = batches;
+            state = new AtomicReference<FlowRuleManager.BatchState>();
+            state.set(BatchState.STARTED);
         }
 
         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
-            // TODO Auto-generated method stub
-            return false;
+            if (state.get() == BatchState.FINISHED) {
+                return false;
+            }
+            if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
+                return false;
+            }
+            cleanUpBatch();
+            for (Future<CompletedBatchOperation> f : futures) {
+                f.cancel(mayInterruptIfRunning);
+            }
+            return true;
         }
 
         @Override
         public boolean isCancelled() {
-            // TODO Auto-generated method stub
-            return false;
+            return state.get() == BatchState.CANCELLED;
         }
 
         @Override
         public boolean isDone() {
-            boolean isDone = true;
-            for (Future<Void> future : futures) {
-                isDone &= future.isDone();
-            }
-            return isDone;
+            return state.get() == BatchState.FINISHED;
         }
 
+
         @Override
         public CompletedBatchOperation get() throws InterruptedException,
-        ExecutionException {
-            // TODO Auto-generated method stub
-            for (Future<Void> future : futures) {
-                future.get();
+            ExecutionException {
+
+            if (isDone()) {
+                return overall;
             }
-            return new CompletedBatchOperation();
+
+            boolean success = true;
+            List<FlowEntry> failed = Lists.newLinkedList();
+            CompletedBatchOperation completed;
+            for (Future<CompletedBatchOperation> future : futures) {
+                completed = future.get();
+                success = validateBatchOperation(failed, completed, future);
+            }
+
+            return finalizeBatchOperation(success, failed);
+
         }
 
         @Override
         public CompletedBatchOperation get(long timeout, TimeUnit unit)
                 throws InterruptedException, ExecutionException,
                 TimeoutException {
-            // TODO we should decrement the timeout
+
+            if (isDone()) {
+                return overall;
+            }
+            boolean success = true;
+            List<FlowEntry> failed = Lists.newLinkedList();
+            CompletedBatchOperation completed;
             long start = System.nanoTime();
             long end = start + unit.toNanos(timeout);
-            for (Future<Void> future : futures) {
+
+            for (Future<CompletedBatchOperation> future : futures) {
                 long now = System.nanoTime();
                 long thisTimeout = end - now;
-                future.get(thisTimeout, TimeUnit.NANOSECONDS);
+                completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
+                success = validateBatchOperation(failed, completed, future);
             }
-            return new CompletedBatchOperation();
+            return finalizeBatchOperation(success, failed);
         }
 
+        private boolean validateBatchOperation(List<FlowEntry> failed,
+                CompletedBatchOperation completed,
+                Future<CompletedBatchOperation> future) {
+
+            if (isCancelled()) {
+                throw new CancellationException();
+            }
+            if (!completed.isSuccess()) {
+                failed.addAll(completed.failedItems());
+                cleanUpBatch();
+                cancelAllSubBatches();
+                return false;
+            }
+            return true;
+        }
+
+        private void cancelAllSubBatches() {
+            for (Future<CompletedBatchOperation> f : futures) {
+                f.cancel(true);
+            }
+        }
+
+        private CompletedBatchOperation finalizeBatchOperation(boolean success,
+                List<FlowEntry> failed) {
+            synchronized (overall) {
+                if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
+                    if (state.get() == BatchState.FINISHED) {
+                        return overall;
+                    }
+                    throw new CancellationException();
+                }
+                overall = new CompletedBatchOperation(success, failed);
+                return overall;
+            }
+        }
+
+        private void cleanUpBatch() {
+            for (FlowRuleBatchEntry fbe : batches.values()) {
+                if (fbe.getOperator() == FlowRuleOperation.ADD ||
+                        fbe.getOperator() == FlowRuleOperation.MODIFY) {
+                    store.deleteFlowRule(fbe.getTarget());
+                } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+                    store.storeFlowRule(fbe.getTarget());
+                }
+            }
+
+        }
     }
 
 
+
+
 }