clean batch operations

Change-Id: I7187de40bb5276d6ae9e9831e5d47d36e16560ad
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index bde752e..e9889cd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -1,6 +1,29 @@
 package org.onlab.onos.net.flow;
 
-public class CompletedBatchOperation {
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
+
+
+    private final boolean success;
+    private final List<FlowEntry> failures;
+
+    public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
+        this.success = success;
+        this.failures = ImmutableList.copyOf(failures);
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return success;
+    }
+
+    @Override
+    public List<FlowEntry> failedItems() {
+        return failures;
+    }
 
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
index 5a0f55b..d4657d2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
@@ -17,6 +17,10 @@
 
     private long lastSeen = -1;
 
+    private final int errType;
+
+    private final int errCode;
+
 
     public DefaultFlowEntry(DeviceId deviceId, TrafficSelector selector,
             TrafficTreatment treatment, int priority, FlowEntryState state,
@@ -27,6 +31,8 @@
         this.life = life;
         this.packets = packets;
         this.bytes = bytes;
+        this.errCode = -1;
+        this.errType = -1;
         this.lastSeen = System.currentTimeMillis();
     }
 
@@ -37,6 +43,8 @@
         this.life = life;
         this.packets = packets;
         this.bytes = bytes;
+        this.errCode = -1;
+        this.errType = -1;
         this.lastSeen = System.currentTimeMillis();
     }
 
@@ -46,9 +54,18 @@
         this.life = 0;
         this.packets = 0;
         this.bytes = 0;
+        this.errCode = -1;
+        this.errType = -1;
         this.lastSeen = System.currentTimeMillis();
     }
 
+    public DefaultFlowEntry(FlowRule rule, int errType, int errCode) {
+        super(rule);
+        this.state = FlowEntryState.FAILED;
+        this.errType = errType;
+        this.errCode = errCode;
+    }
+
     @Override
     public long life() {
         return life;
@@ -100,6 +117,16 @@
     }
 
     @Override
+    public int errType() {
+        return this.errType;
+    }
+
+    @Override
+    public int errCode() {
+        return this.errCode;
+    }
+
+    @Override
     public String toString() {
         return toStringHelper(this)
                 .add("rule", super.toString())
@@ -108,4 +135,6 @@
     }
 
 
+
+
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
index 5b5f89b..882c9df 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
@@ -29,7 +29,12 @@
         /**
          * Flow has been removed from flow table and can be purged.
          */
-        REMOVED
+        REMOVED,
+
+        /**
+         * Indicates that the installation of this flow has failed.
+         */
+        FAILED
     }
 
     /**
@@ -95,4 +100,16 @@
      */
     void setBytes(long bytes);
 
+    /**
+     * Indicates the error type.
+     * @return an integer value of the error
+     */
+    int errType();
+
+    /**
+     * Indicates the error code.
+     * @return an integer value of the error
+     */
+    int errCode();
+
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 68762ac..3592e39 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -37,6 +37,12 @@
      */
     void removeRulesById(ApplicationId id, FlowRule... flowRules);
 
-    Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+    /**
+     * Installs a batch of flow rules. Each flowrule is associated to an
+     * operation which results in either addition, removal or modification.
+     * @param batch a batch of flow rules
+     * @return a future indicating the status of this execution
+     */
+    Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
 
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a9eddd8..a897cbb 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,10 +5,12 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -26,6 +28,7 @@
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onlab.onos.net.flow.FlowRuleBatchOperation;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
@@ -52,6 +55,8 @@
         extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
         implements FlowRuleService, FlowRuleProviderRegistry {
 
+    enum BatchState { STARTED, FINISHED, CANCELLED };
+
     public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
     private final Logger log = getLogger(getClass());
 
@@ -144,7 +149,7 @@
             FlowRuleBatchOperation batch) {
         Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
                 ArrayListMultimap.create();
-        List<Future<Void>> futures = Lists.newArrayList();
+        List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             final FlowRule f = fbe.getTarget();
             final Device device = deviceService.getDevice(f.deviceId());
@@ -165,10 +170,10 @@
         for (FlowRuleProvider provider : batches.keySet()) {
             FlowRuleBatchOperation b =
                     new FlowRuleBatchOperation(batches.get(provider));
-            Future<Void> future = provider.executeBatch(b);
+            Future<CompletedBatchOperation> future = provider.executeBatch(b);
             futures.add(future);
         }
-        return new FlowRuleBatchFuture(futures);
+        return new FlowRuleBatchFuture(futures, batches);
     }
 
     @Override
@@ -341,59 +346,140 @@
     private class FlowRuleBatchFuture
         implements Future<CompletedBatchOperation> {
 
-        private final List<Future<Void>> futures;
+        private final List<Future<CompletedBatchOperation>> futures;
+        private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+        private final AtomicReference<BatchState> state;
+        private CompletedBatchOperation overall;
 
-        public FlowRuleBatchFuture(List<Future<Void>> futures) {
+
+
+        public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
+                Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
             this.futures = futures;
+            this.batches = batches;
+            state = new AtomicReference<FlowRuleManager.BatchState>();
+            state.set(BatchState.STARTED);
         }
 
         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
-            // TODO Auto-generated method stub
-            return false;
+            if (state.get() == BatchState.FINISHED) {
+                return false;
+            }
+            if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
+                return false;
+            }
+            cleanUpBatch();
+            for (Future<CompletedBatchOperation> f : futures) {
+                f.cancel(mayInterruptIfRunning);
+            }
+            return true;
         }
 
         @Override
         public boolean isCancelled() {
-            // TODO Auto-generated method stub
-            return false;
+            return state.get() == BatchState.CANCELLED;
         }
 
         @Override
         public boolean isDone() {
-            boolean isDone = true;
-            for (Future<Void> future : futures) {
-                isDone &= future.isDone();
-            }
-            return isDone;
+            return state.get() == BatchState.FINISHED;
         }
 
+
         @Override
         public CompletedBatchOperation get() throws InterruptedException,
-        ExecutionException {
-            // TODO Auto-generated method stub
-            for (Future<Void> future : futures) {
-                future.get();
+            ExecutionException {
+
+            if (isDone()) {
+                return overall;
             }
-            return new CompletedBatchOperation();
+
+            boolean success = true;
+            List<FlowEntry> failed = Lists.newLinkedList();
+            CompletedBatchOperation completed;
+            for (Future<CompletedBatchOperation> future : futures) {
+                completed = future.get();
+                success = validateBatchOperation(failed, completed, future);
+            }
+
+            return finalizeBatchOperation(success, failed);
+
         }
 
         @Override
         public CompletedBatchOperation get(long timeout, TimeUnit unit)
                 throws InterruptedException, ExecutionException,
                 TimeoutException {
-            // TODO we should decrement the timeout
+
+            if (isDone()) {
+                return overall;
+            }
+            boolean success = true;
+            List<FlowEntry> failed = Lists.newLinkedList();
+            CompletedBatchOperation completed;
             long start = System.nanoTime();
             long end = start + unit.toNanos(timeout);
-            for (Future<Void> future : futures) {
+
+            for (Future<CompletedBatchOperation> future : futures) {
                 long now = System.nanoTime();
                 long thisTimeout = end - now;
-                future.get(thisTimeout, TimeUnit.NANOSECONDS);
+                completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
+                success = validateBatchOperation(failed, completed, future);
             }
-            return new CompletedBatchOperation();
+            return finalizeBatchOperation(success, failed);
         }
 
+        private boolean validateBatchOperation(List<FlowEntry> failed,
+                CompletedBatchOperation completed,
+                Future<CompletedBatchOperation> future) {
+
+            if (isCancelled()) {
+                throw new CancellationException();
+            }
+            if (!completed.isSuccess()) {
+                failed.addAll(completed.failedItems());
+                cleanUpBatch();
+                cancelAllSubBatches();
+                return false;
+            }
+            return true;
+        }
+
+        private void cancelAllSubBatches() {
+            for (Future<CompletedBatchOperation> f : futures) {
+                f.cancel(true);
+            }
+        }
+
+        private CompletedBatchOperation finalizeBatchOperation(boolean success,
+                List<FlowEntry> failed) {
+            synchronized (overall) {
+                if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
+                    if (state.get() == BatchState.FINISHED) {
+                        return overall;
+                    }
+                    throw new CancellationException();
+                }
+                overall = new CompletedBatchOperation(success, failed);
+                return overall;
+            }
+        }
+
+        private void cleanUpBatch() {
+            for (FlowRuleBatchEntry fbe : batches.values()) {
+                if (fbe.getOperator() == FlowRuleOperation.ADD ||
+                        fbe.getOperator() == FlowRuleOperation.MODIFY) {
+                    store.deleteFlowRule(fbe.getTarget());
+                } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+                    store.storeFlowRule(fbe.getTarget());
+                }
+            }
+
+        }
     }
 
 
+
+
 }
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 86f3ddc..5b363da 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -28,6 +28,7 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.device.DeviceListener;
 import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.flow.FlowEntry;
@@ -408,7 +409,7 @@
         }
 
         @Override
-        public Future<Void> executeBatch(
+        public Future<CompletedBatchOperation> executeBatch(
                 BatchOperation<FlowRuleBatchEntry> batch) {
             // TODO Auto-generated method stub
             return null;
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 78f5874..9568f1f 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -27,6 +27,8 @@
 import org.onlab.onos.net.flow.instructions.L3ModificationInstruction;
 import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction;
 import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFlowAdd;
+import org.projectfloodlight.openflow.protocol.OFFlowDelete;
 import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFFlowModFlags;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
@@ -68,12 +70,13 @@
         this.cookie = flowRule.id();
     }
 
-    public OFFlowMod buildFlowAdd() {
+    public OFFlowAdd buildFlowAdd() {
         Match match = buildMatch();
         List<OFAction> actions = buildActions();
 
         //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
-        OFFlowMod fm = factory.buildFlowAdd()
+        OFFlowAdd fm = factory.buildFlowAdd()
+                .setXid(cookie.value())
                 .setCookie(U64.of(cookie.value()))
                 .setBufferId(OFBufferId.NO_BUFFER)
                 .setActions(actions)
@@ -92,6 +95,7 @@
 
         //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
         OFFlowMod fm = factory.buildFlowModify()
+                .setXid(cookie.value())
                 .setCookie(U64.of(cookie.value()))
                 .setBufferId(OFBufferId.NO_BUFFER)
                 .setActions(actions)
@@ -104,11 +108,12 @@
 
     }
 
-    public OFFlowMod buildFlowDel() {
+    public OFFlowDelete buildFlowDel() {
         Match match = buildMatch();
         List<OFAction> actions = buildActions();
 
-        OFFlowMod fm = factory.buildFlowDelete()
+        OFFlowDelete fm = factory.buildFlowDelete()
+                .setXid(cookie.value())
                 .setCookie(U64.of(cookie.value()))
                 .setBufferId(OFBufferId.NO_BUFFER)
                 .setActions(actions)
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 0aca754..2da09fe 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,6 +2,7 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -21,9 +22,12 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onlab.onos.net.flow.FlowRuleProvider;
 import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
 import org.onlab.onos.net.flow.FlowRuleProviderService;
@@ -40,6 +44,7 @@
 import org.projectfloodlight.openflow.protocol.OFActionType;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -52,6 +57,11 @@
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
 import org.projectfloodlight.openflow.types.OFPort;
@@ -70,6 +80,8 @@
 @Component(immediate = true)
 public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
 
+    enum BatchState { STARTED, FINISHED, CANCELLED };
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -88,6 +100,9 @@
     private final Map<Long, InstallationFuture> pendingFutures =
             new ConcurrentHashMap<Long, InstallationFuture>();
 
+    private final Map<Long, InstallationFuture> pendingFMs =
+            new ConcurrentHashMap<Long, InstallationFuture>();
+
     /**
      * Creates an OpenFlow host provider.
      */
@@ -143,9 +158,47 @@
         removeFlowRule(flowRules);
     }
 
+    @Override
+    public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+        final Set<Dpid> sws = new HashSet<Dpid>();
+        final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
+        OFFlowMod mod = null;
+        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+            FlowRule flowRule = fbe.getTarget();
+            OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+            sws.add(new Dpid(sw.getId()));
+            FlowModBuilder builder = new FlowModBuilder(flowRule, sw.factory());
+            switch (fbe.getOperator()) {
+                case ADD:
+                    mod = builder.buildFlowAdd();
+                    break;
+                case REMOVE:
+                    mod = builder.buildFlowDel();
+                    break;
+                case MODIFY:
+                    mod = builder.buildFlowMod();
+                    break;
+                default:
+                    log.error("Unsupported batch operation {}", fbe.getOperator());
+            }
+            if (mod != null) {
+                sw.sendMsg(mod);
+                fmXids.put(mod.getXid(), fbe);
+            } else {
+                log.error("Conversion of flowrule {} failed.", flowRule);
+            }
 
-    //TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
-    // possibly barriers as well. May not be internal at all...
+        }
+        InstallationFuture installation = new InstallationFuture(sws, fmXids);
+        for (Long xid : fmXids.keySet()) {
+            pendingFMs.put(xid, installation);
+        }
+        pendingFutures.put(U32.f(batch.hashCode()), installation);
+        installation.verify(batch.hashCode());
+        return installation;
+    }
+
+
     private class InternalFlowProvider
     implements OpenFlowSwitchListener, OpenFlowEventListener {
 
@@ -175,7 +228,6 @@
             InstallationFuture future = null;
             switch (msg.getType()) {
             case FLOW_REMOVED:
-                //TODO: make this better
                 OFFlowRemoved removed = (OFFlowRemoved) msg;
 
                 FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
@@ -191,7 +243,7 @@
                 }
                 break;
             case ERROR:
-                future = pendingFutures.get(msg.getXid());
+                future = pendingFMs.get(msg.getXid());
                 if (future != null) {
                     future.fail((OFErrorMsg) msg, dpid);
                 }
@@ -203,10 +255,7 @@
         }
 
         @Override
-        public void roleAssertFailed(Dpid dpid, RoleState role) {
-            // TODO Auto-generated method stub
-
-        }
+        public void roleAssertFailed(Dpid dpid, RoleState role) {}
 
         private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
             if (stats.getStatsType() != OFStatsType.FLOW) {
@@ -230,7 +279,6 @@
         }
 
         private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
-            // TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
             if (reply.getVersion().equals(OFVersion.OF_10) ||
                     reply.getMatch().getMatchFields().iterator().hasNext()) {
                 return false;
@@ -251,104 +299,91 @@
             }
             return false;
         }
-
     }
 
-
-    @Override
-    public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
-        final Set<Dpid> sws = new HashSet<Dpid>();
-
-        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            FlowRule flowRule = fbe.getTarget();
-            OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
-            sws.add(new Dpid(sw.getId()));
-            switch (fbe.getOperator()) {
-                case ADD:
-                  //TODO: Track XID for each flowmod
-                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
-                    break;
-                case REMOVE:
-                  //TODO: Track XID for each flowmod
-                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
-                    break;
-                case MODIFY:
-                  //TODO: Track XID for each flowmod
-                    sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
-                    break;
-                default:
-                    log.error("Unsupported batch operation {}", fbe.getOperator());
-            }
-        }
-        InstallationFuture installation = new InstallationFuture(sws);
-        pendingFutures.put(U32.f(batch.hashCode()), installation);
-        installation.verify(batch.hashCode());
-        return installation;
-    }
-
-    private class InstallationFuture implements Future<Void> {
+    private class InstallationFuture implements Future<CompletedBatchOperation> {
 
         private final Set<Dpid> sws;
         private final AtomicBoolean ok = new AtomicBoolean(true);
+        private final Map<Long, FlowRuleBatchEntry> fms;
+
         private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
 
         private final CountDownLatch countDownLatch;
+        private Integer pendingXid;
+        private BatchState state;
 
-        public InstallationFuture(Set<Dpid> sws) {
+        public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
+            this.state = BatchState.STARTED;
             this.sws = sws;
+            this.fms = fmXids;
             countDownLatch = new CountDownLatch(sws.size());
         }
 
         public void fail(OFErrorMsg msg, Dpid dpid) {
             ok.set(false);
-            //TODO add reason to flowentry
+            FlowEntry fe = null;
+            FlowRuleBatchEntry fbe = fms.get(msg.getXid());
+            FlowRule offending = fbe.getTarget();
             //TODO handle specific error msgs
-            //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
             switch (msg.getErrType()) {
                 case BAD_ACTION:
+                    OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
+                    fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
+                            bad.getCode().ordinal());
                     break;
                 case BAD_INSTRUCTION:
+                    OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
+                    fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
+                            badins.getCode().ordinal());
                     break;
                 case BAD_MATCH:
+                    OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
+                    fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
+                            badMatch.getCode().ordinal());
                     break;
                 case BAD_REQUEST:
-                    break;
-                case EXPERIMENTER:
+                    OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
+                    fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
+                            badReq.getCode().ordinal());
                     break;
                 case FLOW_MOD_FAILED:
+                    OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
+                    fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
+                            fmFail.getCode().ordinal());
                     break;
+                case EXPERIMENTER:
                 case GROUP_MOD_FAILED:
-                    break;
                 case HELLO_FAILED:
-                    break;
                 case METER_MOD_FAILED:
-                    break;
                 case PORT_MOD_FAILED:
-                    break;
                 case QUEUE_OP_FAILED:
-                    break;
                 case ROLE_REQUEST_FAILED:
-                    break;
                 case SWITCH_CONFIG_FAILED:
-                    break;
                 case TABLE_FEATURES_FAILED:
-                    break;
                 case TABLE_MOD_FAILED:
+                    fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
                     break;
                 default:
-                    break;
+                    log.error("Unknown error type {}", msg.getErrType());
 
             }
+            offendingFlowMods.add(fe);
 
         }
 
+
         public void satisfyRequirement(Dpid dpid) {
             log.warn("Satisfaction from switch {}", dpid);
-            sws.remove(controller.getSwitch(dpid));
+            sws.remove(dpid);
             countDownLatch.countDown();
+            cleanUp();
+
         }
 
+
         public void verify(Integer id) {
+            pendingXid = id;
             for (Dpid dpid : sws) {
                 OpenFlowSwitch sw = controller.getSwitch(dpid);
                 OFBarrierRequest.Builder builder = sw.factory()
@@ -356,41 +391,57 @@
                         .setXid(id);
                 sw.sendMsg(builder.build());
             }
-
-
         }
 
         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
-                // TODO Auto-generated method stub
-                return false;
+            this.state = BatchState.CANCELLED;
+            cleanUp();
+            for (FlowRuleBatchEntry fbe : fms.values()) {
+                if (fbe.getOperator() == FlowRuleOperation.ADD ||
+                        fbe.getOperator() == FlowRuleOperation.MODIFY) {
+                    removeFlowRule(fbe.getTarget());
+                } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+                    applyRule(fbe.getTarget());
+                }
+
+            }
+            return isCancelled();
         }
 
         @Override
         public boolean isCancelled() {
-            // TODO Auto-generated method stub
-            return false;
+            return this.state == BatchState.CANCELLED;
         }
 
         @Override
         public boolean isDone() {
-            return sws.isEmpty();
+            return this.state == BatchState.FINISHED;
         }
 
         @Override
-        public Void get() throws InterruptedException, ExecutionException {
+        public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
             countDownLatch.await();
-            //return offendingFlowMods;
-            return null;
+            this.state = BatchState.FINISHED;
+            return new CompletedBatchOperation(ok.get(), offendingFlowMods);
         }
 
         @Override
-        public Void get(long timeout, TimeUnit unit)
+        public CompletedBatchOperation get(long timeout, TimeUnit unit)
                 throws InterruptedException, ExecutionException,
                 TimeoutException {
             countDownLatch.await(timeout, unit);
-            //return offendingFlowMods;
-            return null;
+            this.state = BatchState.FINISHED;
+            return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+        }
+
+        private void cleanUp() {
+            if (sws.isEmpty()) {
+                pendingFutures.remove(pendingXid);
+                for (Long xid : fms.keySet()) {
+                    pendingFMs.remove(xid);
+                }
+            }
         }
 
     }