Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java b/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
index a5fc611..9c4700c 100644
--- a/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/AddFlowsCommand.java
@@ -19,29 +19,30 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
+import org.onlab.packet.MacAddress;
 import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
 import org.onosproject.net.Device;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
-import org.onlab.packet.MacAddress;
 
 import java.util.ArrayList;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Installs many many flows.
@@ -50,6 +51,8 @@
          description = "Installs a number of test flow rules - for testing only")
 public class AddFlowsCommand extends AbstractShellCommand {
 
+    private CountDownLatch latch;
+
     @Argument(index = 0, name = "flowPerDevice", description = "Number of flows to add per device",
               required = true, multiValued = false)
     String flows = null;
@@ -63,6 +66,9 @@
 
         FlowRuleService flowService = get(FlowRuleService.class);
         DeviceService deviceService = get(DeviceService.class);
+        CoreService coreService = get(CoreService.class);
+
+        ApplicationId appId = coreService.registerApplication("onos.test.flow.installer");
 
         int flowsPerDevice = Integer.parseInt(flows);
         int num = Integer.parseInt(numOfRuns);
@@ -70,49 +76,73 @@
         ArrayList<Long> results = Lists.newArrayList();
         Iterable<Device> devices = deviceService.getDevices();
         TrafficTreatment treatment = DefaultTrafficTreatment.builder()
-                .setOutput(PortNumber.portNumber(1)).build();
+                .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
         TrafficSelector.Builder sbuilder;
-        Set<FlowRuleBatchEntry> rules = Sets.newHashSet();
-        Set<FlowRuleBatchEntry> remove = Sets.newHashSet();
+        FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
+        FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
+
         for (Device d : devices) {
             for (int i = 0; i < flowsPerDevice; i++) {
                 sbuilder = DefaultTrafficSelector.builder();
-                sbuilder.matchEthSrc(MacAddress.valueOf(i))
-                        .matchEthDst(MacAddress.valueOf(Integer.MAX_VALUE - i));
-                rules.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD,
-                                                    new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
-                                                                        100, (long) 0, 10, false)));
-                remove.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
-                                                 new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
-                                                                     100, (long) 0, 10, false)));
+
+                sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
+                        .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
+
+
+                int randomPriority = RandomUtils.nextInt();
+                rules.add(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+                                              randomPriority, appId, 10, false));
+                remove.remove(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+                                                  randomPriority, appId, 10, false));
 
             }
         }
-        boolean isSuccess = true;
+
         for (int i = 0; i < num; i++) {
-            long startTime = System.currentTimeMillis();
-            Future<CompletedBatchOperation> op = flowService.applyBatch(
-                    new FlowRuleBatchOperation(rules));
+
+            latch = new CountDownLatch(2);
+            flowService.apply(rules.build(new FlowRuleOperationsContext() {
+
+                private final Stopwatch timer = Stopwatch.createStarted();
+
+                @Override
+                public void onSuccess(FlowRuleOperations ops) {
+
+                    timer.stop();
+                    results.add(timer.elapsed(TimeUnit.MILLISECONDS));
+                    if (results.size() == num) {
+                        if (outputJson()) {
+                            print("%s", json(new ObjectMapper(), true, results));
+                        } else {
+                            printTime(true, results);
+                        }
+                    }
+                    latch.countDown();
+                }
+            }));
+
+
+            flowService.apply(remove.build(new FlowRuleOperationsContext() {
+                @Override
+                public void onSuccess(FlowRuleOperations ops) {
+                    latch.countDown();
+                }
+            }));
             try {
-                isSuccess &= op.get().isSuccess();
-            } catch (InterruptedException | ExecutionException e) {
+                latch.await();
+            } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            long endTime = System.currentTimeMillis();
-            results.add(endTime - startTime);
-            flowService.applyBatch(
-                    new FlowRuleBatchOperation(remove));
+
         }
-        if (outputJson()) {
-            print("%s", json(new ObjectMapper(), isSuccess, results));
-        } else {
-            printTime(isSuccess, results);
-        }
+
 
 
 
     }
 
+
+
     private Object json(ObjectMapper mapper, boolean isSuccess, ArrayList<Long> elapsed) {
         ObjectNode result = mapper.createObjectNode();
         result.put("Success", isSuccess);
diff --git a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
index a17f7a3..fce7ee9 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
@@ -21,6 +21,7 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.DeviceId;
 
 /**
  * Representation of a completed flow rule batch operation.
@@ -30,19 +31,22 @@
     private final boolean success;
     private final Set<FlowRule> failures;
     private final Set<Long> failedIds;
+    private final DeviceId deviceId;
 
     /**
      * Creates a new batch completion result.
      *
-     * @param success  indicates whether the completion is successful.
+     * @param success  indicates whether the completion is successful
      * @param failures set of any failures encountered
      * @param failedIds (optional) set of failed operation ids
+     * @param deviceId the device this operation completed for
      */
     public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
-                                   Set<Long> failedIds) {
+                                   Set<Long> failedIds, DeviceId deviceId) {
         this.success = success;
         this.failures = ImmutableSet.copyOf(failures);
         this.failedIds = ImmutableSet.copyOf(failedIds);
+        this.deviceId = deviceId;
     }
 
     /**
@@ -51,10 +55,12 @@
      * @param success  indicates whether the completion is successful.
      * @param failures set of any failures encountered
      */
-    public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
+    public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
+                                   DeviceId deviceId) {
         this.success = success;
         this.failures = ImmutableSet.copyOf(failures);
         this.failedIds = Collections.emptySet();
+        this.deviceId = deviceId;
     }
 
 
@@ -73,12 +79,17 @@
         return failedIds;
     }
 
+    public DeviceId deviceId() {
+        return this.deviceId;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("success?", success)
                 .add("failedItems", failures)
                 .add("failedIds", failedIds)
+                .add("deviceId", deviceId)
                 .toString();
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
index ab72d8b..9bcace2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
@@ -16,12 +16,14 @@
 package org.onosproject.net.flow;
 
 import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
 
 /**
  * Describes flow rule batch event.
  */
 public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
 
+
     /**
      * Type of flow rule events.
      */
@@ -42,14 +44,17 @@
     }
 
     private final CompletedBatchOperation result;
+    private final DeviceId deviceId;
 
     /**
      * Constructs a new FlowRuleBatchEvent.
-     * @param request batch operation request.
+     *
+     * @param request batch operation request
+     * @param deviceId the device this batch will be processed on
      * @return event.
      */
-    public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
-        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+    public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request, DeviceId deviceId) {
+        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, deviceId);
         return event;
     }
 
@@ -73,13 +78,36 @@
     }
 
     /**
+     * Returns the deviceId for this batch.
+     * @return device id
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    /**
      * Creates an event of a given type and for the specified flow rule batch.
      *
      * @param type    flow rule batch event type
-     * @param batch    event flow rule batch subject
+     * @param request event flow rule batch subject
+     * @param result  the result of the batch operation
      */
     private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
         super(type, request);
         this.result = result;
+        this.deviceId = result.deviceId();
+    }
+
+    /**
+     * Creates an event of a given type and for the specified flow rule batch.
+     *
+     * @param type      flow rule batch event type
+     * @param request   event flow rule batch subject
+     * @param deviceId  the device id for this batch
+     */
+    private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, DeviceId deviceId) {
+        super(type, request);
+        this.result = null;
+        this.deviceId = deviceId;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
index f9bc632..e13acd7 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
@@ -15,12 +15,37 @@
  */
 package org.onosproject.net.flow;
 
+import org.onosproject.net.DeviceId;
+
 import java.util.Collection;
 
+/**
+ * Class used with the flow subsystem to process per device
+ * batches.
+ */
 public class FlowRuleBatchOperation
     extends BatchOperation<FlowRuleBatchEntry> {
 
-    public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+    /**
+     * This id is used to cary to id of the original
+     * FlowOperations and track where this batch operation
+     * came from. The id is unique cluster wide.
+     */
+    private final long id;
+    private final DeviceId deviceId;
+
+    public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations,
+                                  DeviceId deviceId, long flowOperationId) {
         super(operations);
+        this.id = flowOperationId;
+        this.deviceId = deviceId;
+    }
+
+    public DeviceId deviceId() {
+        return this.deviceId;
+    }
+
+    public long id() {
+        return id;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
index 35db171..5570a40 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
@@ -15,59 +15,43 @@
  */
 package org.onosproject.net.flow;
 
+import com.google.common.collect.Lists;
+import org.onosproject.net.DeviceId;
+
 import java.util.Collections;
 import java.util.List;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-
-
-
-import com.google.common.collect.Lists;
+import java.util.Set;
 
 public class FlowRuleBatchRequest {
 
-    private final int batchId;
-    private final List<FlowRuleBatchEntry> toAdd;
-    private final List<FlowRuleBatchEntry> toRemove;
+    /**
+     * This id is used to cary to id of the original
+     * FlowOperations and track where this batch operation
+     * came from. The id is unique cluster wide.
+     */
+    private final long batchId;
 
-    public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
-                                List<FlowRuleBatchEntry> toRemove) {
+    private final Set<FlowRuleBatchEntry> ops;
+
+
+    public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
         this.batchId = batchId;
-        this.toAdd = Collections.unmodifiableList(toAdd);
-        this.toRemove = Collections.unmodifiableList(toRemove);
+        this.ops = Collections.unmodifiableSet(ops);
+
+
     }
 
-    public List<FlowRule> toAdd() {
-        return FluentIterable.from(toAdd).transform(
-                new Function<FlowRuleBatchEntry, FlowRule>() {
-
-            @Override
-            public FlowRule apply(FlowRuleBatchEntry input) {
-                return input.target();
-            }
-        }).toList();
+    public Set<FlowRuleBatchEntry> ops() {
+        return ops;
     }
 
-    public List<FlowRule> toRemove() {
-        return FluentIterable.from(toRemove).transform(
-                new Function<FlowRuleBatchEntry, FlowRule>() {
-
-                    @Override
-                    public FlowRule apply(FlowRuleBatchEntry input) {
-                        return input.target();
-                    }
-                }).toList();
-    }
-
-    public FlowRuleBatchOperation asBatchOperation() {
+    public FlowRuleBatchOperation asBatchOperation(DeviceId deviceId) {
         List<FlowRuleBatchEntry> entries = Lists.newArrayList();
-        entries.addAll(toAdd);
-        entries.addAll(toRemove);
-        return new FlowRuleBatchOperation(entries);
+        entries.addAll(ops);
+        return new FlowRuleBatchOperation(entries, deviceId, batchId);
     }
 
-    public int batchId() {
+    public long batchId() {
         return batchId;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
new file mode 100644
index 0000000..82d43be
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of an operation on a flow rule table.
+ */
+public class FlowRuleOperation {
+
+    /**
+     * Type of flow table operations.
+     */
+    public enum Type {
+        ADD,
+        MODIFY,
+        REMOVE
+    }
+
+    private final FlowRule rule;
+    private final Type type;
+
+    public FlowRuleOperation(FlowRule rule, Type type) {
+        this.rule = rule;
+        this.type = type;
+    }
+
+    /**
+     * Returns the type of operation.
+     *
+     * @return type
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the flow rule.
+     *
+     * @return flow rule
+     */
+    public FlowRule rule() {
+        return rule;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("rule", rule)
+                .add("type", type)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
new file mode 100644
index 0000000..498cc05
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+
+/**
+ * A batch of flow rule operations that are broken into stages.
+ * TODO move this up to parent's package
+ */
+public class FlowRuleOperations {
+
+    private final List<Set<FlowRuleOperation>> stages;
+    private final FlowRuleOperationsContext callback; // TODO consider Optional
+
+    private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
+                               FlowRuleOperationsContext cb) {
+        this.stages = stages;
+        this.callback = cb;
+    }
+
+    // kryo-constructor
+    protected FlowRuleOperations() {
+        this.stages = Lists.newArrayList();
+        this.callback = null;
+    }
+
+    /**
+     * Returns the flow rule operations as sets of stages that should be
+     * executed sequentially.
+     *
+     * @return flow rule stages
+     */
+    public List<Set<FlowRuleOperation>> stages() {
+        return stages;
+    }
+
+    /**
+     * Returns the callback for this batch of operations.
+     *
+     * @return callback
+     */
+    public FlowRuleOperationsContext callback() {
+        return callback;
+    }
+
+    /**
+     * Returns a new builder.
+     *
+     * @return new builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("stages", stages)
+                .toString();
+    }
+
+    /**
+     * A builder for constructing flow rule operations.
+     */
+    public static final class Builder {
+
+        private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
+        private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+
+        // prevent use of the default constructor outside of this file; use the above method
+        private Builder() {}
+
+        /**
+         * Appends a flow rule add to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        public Builder add(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, ADD));
+            return this;
+        }
+
+        /**
+         * Appends a flow rule modify to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        public Builder modify(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, MODIFY));
+            return this;
+        }
+
+        /**
+         * Appends a flow rule remove to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        // FIXME this is confusing, consider renaming
+        public Builder remove(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, REMOVE));
+            return this;
+        }
+
+        /**
+         * Closes the current stage.
+         */
+        private void closeStage() {
+            ImmutableSet<FlowRuleOperation> stage = currentStage.build();
+            if (!stage.isEmpty()) {
+                listBuilder.add(stage);
+            }
+        }
+
+        /**
+         * Closes the current stage and starts a new one.
+         *
+         * @return this
+         */
+        public Builder newStage() {
+            closeStage();
+            currentStage = ImmutableSet.builder();
+            return this;
+        }
+
+        /**
+         * Builds the immutable flow rule operations.
+         *
+         * @return flow rule operations
+         */
+        public FlowRuleOperations build() {
+            return build(null);
+        }
+
+        /**
+         * Builds the immutable flow rule operations.
+         *
+         * @param cb the callback to call when this operation completes
+         * @return flow rule operations
+         */
+        public FlowRuleOperations build(FlowRuleOperationsContext cb) {
+            closeStage();
+            return new FlowRuleOperations(listBuilder.build(), cb);
+        }
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
new file mode 100644
index 0000000..c405b12
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+/**
+ * The context of a flow rule operations that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface FlowRuleOperationsContext {
+    // TODO we might also want to execute a method on behalf of the app
+    default void onSuccess(FlowRuleOperations ops){}
+    default void onError(FlowRuleOperations ops){}
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index aae6902..066ac68 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -18,8 +18,6 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.provider.Provider;
 
-import java.util.concurrent.Future;
-
 /**
  * Abstraction of a flow rule provider.
  */
@@ -56,8 +54,7 @@
      * Installs a batch of flow rules. Each flowrule is associated to an
      * operation which results in either addition, removal or modification.
      * @param batch a batch of flow rules
-     * @return a future indicating the status of this execution
      */
-    Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+    void executeBatch(FlowRuleBatchOperation batch);
 
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
index 9456719..4c9ebcb 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
@@ -40,4 +40,13 @@
      */
     void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
 
+    /**
+     * Indicates to the core that the requested batch operation has
+     * been completed.
+     *
+     * @param batchId the batch which was processed
+     * @param operation the resulting outcome of the operation
+     */
+    void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78772c2..957ecf2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -15,11 +15,11 @@
  */
 package org.onosproject.net.flow;
 
-import java.util.concurrent.Future;
-
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 
+import java.util.concurrent.Future;
+
 /**
  * Service for injecting flow rules into the environment and for obtaining
  * information about flow rules already in the environment. This implements
@@ -30,6 +30,11 @@
 public interface FlowRuleService {
 
     /**
+     * The topic used for obtaining globally unique ids.
+     */
+    static String FLOW_OP_TOPIC = "flow-ops-ids";
+
+    /**
      * Returns the number of flow rules in the system.
      *
      * @return flow rule count
@@ -96,11 +101,20 @@
      * Applies a batch operation of FlowRules.
      *
      * @param batch batch operation to apply
-     * @return future indicating the state of the batch operation
+     * @return future indicating the state of the batch operation, due to the
+     * deprecation of this api the future will immediately return
      */
+    @Deprecated
     Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
 
     /**
+     * Applies a batch operation of FlowRules.
+     *
+     * @param ops batch operation to apply
+     */
+    void apply(FlowRuleOperations ops);
+
+    /**
      * Adds the specified flow rule listener.
      *
      * @param listener flow rule listener
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 6fca2c3..d3aebba 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,8 +15,6 @@
  */
 package org.onosproject.net.flow;
 
-import java.util.concurrent.Future;
-
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.Store;
 
@@ -54,6 +52,7 @@
      *
      * @param rule the flow rule to add
      */
+    @Deprecated
     void storeFlowRule(FlowRule rule);
 
     /**
@@ -61,10 +60,9 @@
      *
      * @param batchOperation batch of flow rules.
      *           A batch can contain flow rules for a single device only.
-     * @return Future response indicating success/failure of the batch operation
-     * all the way down to the device.
+     *
      */
-    Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+    void storeBatch(FlowRuleBatchOperation batchOperation);
 
     /**
      * Invoked on the completion of a storeBatch operation.
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
index 07b60fa..70fe077 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchOperationTest.java
@@ -46,10 +46,10 @@
         final LinkedList<FlowRuleBatchEntry> ops3 = new LinkedList<>();
         ops3.add(entry3);
 
-        final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1);
-        final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1);
-        final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2);
-        final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3);
+        final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1, null, 0);
+        final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1, null, 0);
+        final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2, null, 0);
+        final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3, null, 0);
 
         new EqualsTester()
                 .addEqualityGroup(operation1, sameAsOperation1)
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
index d7ad903..e6864e1 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleBatchRequestTest.java
@@ -15,17 +15,18 @@
  */
 package org.onosproject.net.flow;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import org.junit.Test;
 import org.onosproject.net.intent.IntentTestsMocks;
 
-import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.ADD;
+import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
 
 /**
  * Unit tests for the FlowRuleBatchRequest class.
@@ -40,22 +41,19 @@
     public void testConstruction() {
         final FlowRule rule1 = new IntentTestsMocks.MockFlowRule(1);
         final FlowRule rule2 = new IntentTestsMocks.MockFlowRule(2);
-        final List<FlowRuleBatchEntry> toAdd = new LinkedList<>();
-        toAdd.add(new FlowRuleBatchEntry(ADD, rule1));
-        final List<FlowRuleBatchEntry> toRemove = new LinkedList<>();
-        toRemove.add(new FlowRuleBatchEntry(REMOVE, rule2));
+        final Set<FlowRuleBatchEntry> batch = new HashSet<>();
+        batch.add(new FlowRuleBatchEntry(ADD, rule1));
+
+        batch.add(new FlowRuleBatchEntry(REMOVE, rule2));
 
 
         final FlowRuleBatchRequest request =
-                new FlowRuleBatchRequest(1, toAdd, toRemove);
+                new FlowRuleBatchRequest(1, batch);
 
-        assertThat(request.toAdd(), hasSize(1));
-        assertThat(request.toAdd().get(0), is(rule1));
-        assertThat(request.toRemove(), hasSize(1));
-        assertThat(request.toRemove().get(0), is(rule2));
-        assertThat(request.batchId(), is(1));
+        assertThat(request.ops(), hasSize(2));
+        assertThat(request.batchId(), is(1L));
 
-        final FlowRuleBatchOperation op = request.asBatchOperation();
+        final FlowRuleBatchOperation op = request.asBatchOperation(rule1.deviceId());
         assertThat(op.size(), is(2));
 
         final List<FlowRuleBatchEntry> ops = op.getOperations();
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
index 8f54e85..393f3cf 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
@@ -66,6 +66,11 @@
     }
 
     @Override
+    public void apply(FlowRuleOperations ops) {
+
+    }
+
+    @Override
     public void addListener(FlowRuleListener listener) {
 
     }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onosproject.net.flow.FlowRuleBatchEvent;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
 import org.onosproject.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
 
-    private ExecutorService futureService;
+    protected ExecutorService deviceInstallers =
+            Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+    protected ExecutorService operationsService =
+            Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+    private IdGenerator idGenerator;
+
+    private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+            ConcurrentHashMap<>();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
@@ -101,10 +111,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
     @Activate
     public void activate() {
-        futureService =
-                Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+        idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
         store.setDelegate(delegate);
         eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
         log.info("Started");
@@ -112,8 +127,8 @@
 
     @Deactivate
     public void deactivate() {
-        futureService.shutdownNow();
-
+        deviceInstallers.shutdownNow();
+        operationsService.shutdownNow();
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(FlowRuleEvent.class);
         log.info("Stopped");
@@ -131,20 +146,20 @@
 
     @Override
     public void applyFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+            builder.add(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+        apply(builder.build());
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+            builder.remove(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+        apply(builder.build());
     }
 
     @Override
@@ -180,23 +195,38 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> applyBatch(
-            FlowRuleBatchOperation batch) {
-        Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
-                ArrayListMultimap.create();
-        List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
-        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            final FlowRule f = fbe.target();
-            perDeviceBatches.put(f.deviceId(), fbe);
-        }
+    public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
 
-        for (DeviceId deviceId : perDeviceBatches.keySet()) {
-            FlowRuleBatchOperation b =
-                    new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
-            Future<CompletedBatchOperation> future = store.storeBatch(b);
-            futures.add(future);
-        }
-        return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+        FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+        batch.getOperations().stream().forEach(op -> {
+                        switch (op.getOperator()) {
+                            case ADD:
+                                fopsBuilder.add(op.getTarget());
+                                break;
+                            case REMOVE:
+                                fopsBuilder.remove(op.getTarget());
+                                break;
+                            case MODIFY:
+                                fopsBuilder.modify(op.getTarget());
+                                break;
+                            default:
+                                log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+                        }
+                }
+        );
+
+        apply(fopsBuilder.build());
+        return Futures.immediateFuture(
+                new CompletedBatchOperation(true,
+                                            Collections.emptySet(), null));
+
+    }
+
+    @Override
+    public void apply(FlowRuleOperations ops) {
+        operationsService.submit(new FlowOperationsProcessor(ops));
     }
 
     @Override
@@ -373,13 +403,19 @@
 
             }
         }
+
+        @Override
+        public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+            store.batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+                    operation
+            ));
+        }
     }
 
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
 
-        // FIXME set appropriate default and make it configurable
-        private static final int TIMEOUT_PER_OP = 500; // ms
 
         // TODO: Right now we only dispatch events at individual flowEntry level.
         // It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
             switch (event.type()) {
             case BATCH_OPERATION_REQUESTED:
                 // Request has been forwarded to MASTER Node, and was
-                for (FlowRule entry : request.toAdd()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
-                }
-                for (FlowRule entry : request.toRemove()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
-                }
-                // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+                request.ops().stream().forEach(
+                        op -> {
+                            switch (op.getOperator()) {
 
-                FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+                                case ADD:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case REMOVE:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case MODIFY:
+                                    //TODO: do something here when the time comes.
+                                    break;
+                                default:
+                                    log.warn("Unknown flow operation operator: {}", op.getOperator());
+                            }
+                        }
+                );
+
+                DeviceId deviceId = event.deviceId();
+
+                FlowRuleBatchOperation batchOperation =
+                        request.asBatchOperation(deviceId);
 
                 FlowRuleProvider flowRuleProvider =
-                        getProvider(batchOperation.getOperations().get(0).target().deviceId());
-                final Future<CompletedBatchOperation> result =
-                        flowRuleProvider.executeBatch(batchOperation);
-                futureService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletedBatchOperation res;
-                        try {
-                            res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                            log.warn("Something went wrong with the batch operation {}",
-                                     request.batchId(), e);
+                        getProvider(deviceId);
 
-                            Set<FlowRule> failures = new HashSet<>(batchOperation.size());
-                            for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
-                                failures.add(op.target());
-                            }
-                            res = new CompletedBatchOperation(false, failures);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        }
-                    }
-                });
+                flowRuleProvider.executeBatch(batchOperation);
+
                 break;
 
             case BATCH_OPERATION_COMPLETED:
-                // MASTER Node has pushed the batch down to the Device
 
-                // Note: RULE_ADDED will be posted
-                // when Flow was actually confirmed by stats reply.
+                FlowOperationsProcessor fops = pendingFlowOperations.remove(
+                        event.subject().batchId());
+                if (event.result().isSuccess()) {
+                    if (fops != null) {
+                        fops.satisfy(event.deviceId());
+                    }
+                } else {
+                    fops.fail(event.deviceId(), event.result().failedItems());
+                }
+
                 break;
 
             default:
@@ -438,141 +482,100 @@
         }
     }
 
-    private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+    private class FlowOperationsProcessor implements Runnable {
 
-        private final List<Future<CompletedBatchOperation>> futures;
-        private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
-        private final AtomicReference<BatchState> state;
-        private CompletedBatchOperation overall;
+        private final List<Set<FlowRuleOperation>> stages;
+        private final FlowRuleOperationsContext context;
+        private final FlowRuleOperations fops;
+        private final AtomicBoolean hasFailed = new AtomicBoolean(false);
 
-        public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
-                Multimap<DeviceId, FlowRuleBatchEntry> batches) {
-            this.futures = futures;
-            this.batches = batches;
-            this.state = new AtomicReference<>(BatchState.STARTED);
-        }
+        private Set<DeviceId> pendingDevices;
 
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (state.get() == BatchState.FINISHED) {
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Cancelling FlowRuleBatchFuture",
-                          new RuntimeException("Just printing backtrace"));
-            }
-            if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
-                return false;
-            }
-            cleanUpBatch();
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(mayInterruptIfRunning);
-            }
-            return true;
-        }
+        public FlowOperationsProcessor(FlowRuleOperations ops) {
 
-        @Override
-        public boolean isCancelled() {
-            return state.get() == BatchState.CANCELLED;
-        }
+            this.stages = Lists.newArrayList(ops.stages());
+            this.context = ops.callback();
+            this.fops = ops;
+            pendingDevices = Sets.newConcurrentHashSet();
 
-        @Override
-        public boolean isDone() {
-            return state.get() == BatchState.FINISHED;
-        }
-
-
-        @Override
-        public CompletedBatchOperation get() throws InterruptedException,
-            ExecutionException {
-
-            if (isDone()) {
-                return overall;
-            }
-
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get();
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-
-            return finalizeBatchOperation(success, failed, failedIds);
 
         }
 
         @Override
-        public CompletedBatchOperation get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException,
-                TimeoutException {
-
-            if (isDone()) {
-                return overall;
-            }
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get(timeout, unit);
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-            return finalizeBatchOperation(success, failed, failedIds);
-        }
-
-        private boolean validateBatchOperation(Set<FlowRule> failed,
-                                               Set<Long> failedIds,
-                                               CompletedBatchOperation completed) {
-
-            if (isCancelled()) {
-                throw new CancellationException();
-            }
-            if (!completed.isSuccess()) {
-                log.warn("FlowRuleBatch failed: {}", completed);
-                failed.addAll(completed.failedItems());
-                failedIds.addAll(completed.failedIds());
-                cleanUpBatch();
-                cancelAllSubBatches();
-                return false;
-            }
-            return true;
-        }
-
-        private void cancelAllSubBatches() {
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(true);
+        public void run() {
+            if (stages.size() > 0) {
+                process(stages.remove(0));
+            } else if (!hasFailed.get() && context != null) {
+                context.onSuccess(fops);
             }
         }
 
-        private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                                                               Set<FlowRule> failed,
-                                                               Set<Long> failedIds) {
-            synchronized (this) {
-                if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
-                    if (state.get() == BatchState.FINISHED) {
-                        return overall;
+        private void process(Set<FlowRuleOperation> ops) {
+            Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+                    ArrayListMultimap.create();
+
+            FlowRuleBatchEntry fbe;
+            for (FlowRuleOperation flowRuleOperation : ops) {
+                switch (flowRuleOperation.type()) {
+                    // FIXME: Brian needs imagination when creating class names.
+                    case ADD:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+                        break;
+                    case MODIFY:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+                        break;
+                    case REMOVE:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+                }
+                pendingDevices.add(flowRuleOperation.rule().deviceId());
+                perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+            }
+
+
+            for (DeviceId deviceId : perDeviceBatches.keySet()) {
+                Long id = idGenerator.getNewId();
+                final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+                                               deviceId, id);
+                pendingFlowOperations.put(id, this);
+                deviceInstallers.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        store.storeBatch(b);
                     }
-                    throw new CancellationException();
-                }
-                overall = new CompletedBatchOperation(success, failed, failedIds);
-                return overall;
+                });
             }
         }
 
-        private void cleanUpBatch() {
-            log.debug("cleaning up batch");
-            // TODO convert these into a batch?
-            for (FlowRuleBatchEntry fbe : batches.values()) {
-                if (fbe.operator() == FlowRuleOperation.ADD ||
-                    fbe.operator() == FlowRuleOperation.MODIFY) {
-                    store.deleteFlowRule(fbe.target());
-                } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
-                    store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
-                    store.storeFlowRule(fbe.target());
-                }
+        public void satisfy(DeviceId devId) {
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
             }
         }
+
+
+
+        public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+            hasFailed.set(true);
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
+            }
+
+            if (context != null) {
+                final FlowRuleOperations.Builder failedOpsBuilder =
+                    FlowRuleOperations.builder();
+                failures.stream().forEach(failedOpsBuilder::add);
+
+                context.onError(failedOpsBuilder.build());
+            }
+        }
+
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 8b6d8ad..685fa70 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -1151,6 +1151,7 @@
          */
         protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
             //TODO test this. (also, maybe save this batch)
+
             FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
             if (batch.size() > 0) {
                 //FIXME apply batch might throw an exception
@@ -1165,7 +1166,7 @@
         }
 
         private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
-            FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+            FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
             for (CompletedIntentUpdate update : intentUpdates) {
                 FlowRuleBatchOperation currentBatch = update.currentBatch();
                 if (currentBatch != null) {
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
index 583d7e3..816fc12 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
@@ -98,6 +98,7 @@
             outputPorts.put(egressPoint.deviceId(), egressPoint.port());
         }
 
+        //FIXME change to new api
         FlowRuleBatchOperation batchOperation =
                 new FlowRuleBatchOperation(outputPorts
                         .keys()
@@ -105,7 +106,7 @@
                         .map(deviceId -> createBatchEntry(operation,
                                                    intent, deviceId,
                                                    outputPorts.get(deviceId)))
-                        .collect(Collectors.toList()));
+                        .collect(Collectors.toList()), null, 0);
 
         return Collections.singletonList(batchOperation);
     }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
index e958c58..5b226a3 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
@@ -181,6 +181,7 @@
                                             true);
         rules.add(new FlowRuleBatchEntry(operation, rule));
 
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        //FIXME change to new api
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
index 1c8ea7e..b90be50 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
@@ -108,7 +108,8 @@
                                              intent.id().fingerprint()));
             prev = link.dst();
         }
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        //FIXME this should change to new api.
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 
     @Override
@@ -138,7 +139,8 @@
                                              intent.id().fingerprint()));
             prev = link.dst();
         }
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        // FIXME this should change to new api
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 
     @Override
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
index c3f9ba0..87fc585 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
@@ -31,7 +31,7 @@
 
     @Override
     @SuppressWarnings("unchecked")
-    public void post(Event event) {
+    public synchronized void post(Event event) {
         EventSink sink = getSink(event.getClass());
         checkState(sink != null, "No sink for event %s", event);
         sink.process(event);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 5a79b43..3fd8b5a 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -20,12 +20,15 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.core.Version;
 import org.onosproject.event.impl.TestEventDispatcher;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
@@ -36,7 +39,6 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.DefaultFlowRule;
@@ -72,6 +74,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.*;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
@@ -97,12 +100,16 @@
     protected TestListener listener = new TestListener();
     private ApplicationId appId;
 
+
     @Before
     public void setUp() {
         mgr = new FlowRuleManager();
         mgr.store = new SimpleFlowRuleStore();
         mgr.eventDispatcher = new TestEventDispatcher();
         mgr.deviceService = new TestDeviceService();
+        mgr.coreService = new TestCoreService();
+        mgr.operationsService = MoreExecutors.newDirectExecutorService();
+        mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
         service = mgr;
         registry = mgr;
 
@@ -246,14 +253,23 @@
 
     @Test
     public void flowRemoved() {
+
         FlowRule f1 = addFlowRule(1);
         FlowRule f2 = addFlowRule(2);
         StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
         FlowEntry fe2 = new DefaultFlowEntry(f2);
+
+
         providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
         service.removeFlowRules(f1);
+
         fe1.setState(FlowEntryState.REMOVED);
+
+
+
         providerService.flowRemoved(fe1);
+
+
         validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
                        RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
 
@@ -263,11 +279,13 @@
         FlowRule f3 = flowRule(3, 3);
         FlowEntry fe3 = new DefaultFlowEntry(f3);
         service.applyFlowRules(f3);
+
         providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
         validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
 
         providerService.flowRemoved(fe3);
         validateEvents();
+
     }
 
     @Test
@@ -281,7 +299,6 @@
         FlowEntry fe1 = new DefaultFlowEntry(f1);
         FlowEntry fe2 = new DefaultFlowEntry(f2);
 
-
         //FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
         //FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
 
@@ -388,7 +405,7 @@
                 FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
 
         FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
-                Lists.newArrayList(fbe1, fbe2));
+                Lists.newArrayList(fbe1, fbe2), null, 0);
         Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
         assertTrue("Entries in wrong state",
                    validateState(ImmutableMap.of(
@@ -406,53 +423,6 @@
 
     }
 
-    @Test
-    public void cancelBatch() {
-        FlowRule f1 = flowRule(1, 1);
-        FlowRule f2 = flowRule(2, 2);
-
-
-        mgr.applyFlowRules(f1);
-
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f1, FlowEntryState.PENDING_ADD)));
-
-        FlowEntry fe1 = new DefaultFlowEntry(f1);
-        providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
-
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f1, FlowEntryState.ADDED)));
-
-
-        FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
-                FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
-
-        FlowRuleBatchEntry fbe2 = new FlowRuleBatchEntry(
-                FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
-
-        FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
-                Lists.newArrayList(fbe1, fbe2));
-        Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
-
-        future.cancel(true);
-
-        assertTrue(flowCount() == 2);
-
-        /*
-         * Rule f1 should be re-added to the list and therefore be in a pending add
-         * state.
-         */
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f2, FlowEntryState.PENDING_REMOVE,
-                           f1, FlowEntryState.PENDING_ADD)));
-
-
-    }
-
-
     private static class TestListener implements FlowRuleListener {
         final List<FlowRuleEvent> events = new ArrayList<>();
 
@@ -528,9 +498,8 @@
         }
 
         @Override
-        public ListenableFuture<CompletedBatchOperation> executeBatch(
-                BatchOperation<FlowRuleBatchEntry> batch) {
-            return new TestInstallationFuture();
+        public void executeBatch(FlowRuleBatchOperation batch) {
+         // TODO: need to call batchOperationComplete
         }
 
         private class TestInstallationFuture
@@ -554,14 +523,14 @@
             @Override
             public CompletedBatchOperation get()
                     throws InterruptedException, ExecutionException {
-                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
             }
 
             @Override
             public CompletedBatchOperation get(long timeout, TimeUnit unit)
                     throws InterruptedException,
                     ExecutionException, TimeoutException {
-                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
             }
 
             @Override
@@ -644,4 +613,37 @@
         }
     }
 
+    private class TestCoreService implements CoreService {
+        @Override
+        public Version version() {
+            return null;
+        }
+
+        @Override
+        public Set<ApplicationId> getAppIds() {
+            return null;
+        }
+
+        @Override
+        public ApplicationId getAppId(Short id) {
+            return null;
+        }
+
+        @Override
+        public ApplicationId registerApplication(String identifier) {
+            return null;
+        }
+
+        @Override
+        public IdGenerator getIdGenerator(String topic) {
+            return new IdGenerator() {
+                private AtomicLong counter = new AtomicLong(0);
+                @Override
+                public long getNewId() {
+                    return counter.getAndIncrement();
+                }
+            };
+        }
+    }
+
 }
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 17a9498..3570c77 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -201,7 +201,7 @@
             FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
 
         @Override
@@ -209,7 +209,7 @@
             FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
 
         @Override
@@ -219,7 +219,7 @@
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
     }
 
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
index e956575..72de4f0 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
@@ -27,6 +27,7 @@
 import org.onosproject.net.flow.FlowRuleBatchEntry;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
 
 import com.google.common.collect.ImmutableSet;
@@ -45,11 +46,11 @@
 
     public void setFuture(boolean success, long intentId) {
         if (success) {
-            future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
+            future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet(), null));
         } else {
             final Set<Long> failedIds = ImmutableSet.of(intentId);
             future = Futures.immediateFuture(
-                    new CompletedBatchOperation(false, flows, failedIds));
+                    new CompletedBatchOperation(false, flows, failedIds, null));
         }
     }
 
@@ -74,6 +75,11 @@
     }
 
     @Override
+    public void apply(FlowRuleOperations ops) {
+
+    }
+
+    @Override
     public int getFlowRuleCount() {
         return flows.size();
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 09aa401..3d66386 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,33 +15,15 @@
  */
 package org.onosproject.store.flow.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.namedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.hazelcast.core.IMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -49,8 +31,11 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
@@ -67,6 +52,7 @@
 import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
@@ -79,27 +65,37 @@
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.hz.AbstractHazelcastStore;
 import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.DecodeTo;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
 import org.slf4j.Logger;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.hazelcast.core.IMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -112,12 +108,10 @@
 
     private final Logger log = getLogger(getClass());
 
-    // primary data:
-    //  read/write needs to be locked
-    private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
-    // store entries as a pile of rules, no info about device tables
-    private final Multimap<DeviceId, StoredFlowEntry> flowEntries
-        = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
+    private InternalFlowTable flowTable = new InternalFlowTable();
+
+    /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+            flowEntries = new ConcurrentHashMap<>();*/
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
@@ -131,23 +125,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
-    private final AtomicInteger localBatchIdGen = new AtomicInteger();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
 
-    private int pendingFutureTimeoutMinutes = 5;
-
-    private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
-            CacheBuilder.newBuilder()
-                .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
-                .removalListener(new TimeoutFuture())
-                .build();
+    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
 
     // Cache of SMaps used for backup data.  each SMap contain device flow table
     private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
 
 
-    private final ExecutorService futureListeners =
-            Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
-
     private final ExecutorService backupExecutors =
             Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
 
@@ -169,6 +155,8 @@
 
     private ReplicaInfoEventListener replicaInfoEventListener;
 
+    private IdGenerator idGenerator;
+
     @Override
     @Activate
     public void activate() {
@@ -176,22 +164,33 @@
         super.serializer = SERIALIZER;
         super.theInstance = storeService.getHazelcastInstance();
 
+        idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+
         // Cache to create SMap on demand
         smaps = CacheBuilder.newBuilder()
-                    .softValues()
-                    .build(new SMapLoader());
+                .softValues()
+                .build(new SMapLoader());
 
         final NodeId local = clusterService.getLocalNode().id();
 
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
 
+        clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+                log.trace("received completed notification for {}", event);
+                notifyDelegate(event);
+            }
+        });
+
         clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
 
             @Override
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.trace("received get flow entry request for {}", rule);
-                FlowEntry flowEntry = getFlowEntryInternal(rule);
+                FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
                 try {
                     message.respond(SERIALIZER.encode(flowEntry));
                 } catch (IOException e) {
@@ -206,7 +205,7 @@
             public void handle(ClusterMessage message) {
                 DeviceId deviceId = SERIALIZER.decode(message.payload());
                 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
-                Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
+                Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
                 try {
                     message.respond(SERIALIZER.encode(flowEntries));
                 } catch (IOException e) {
@@ -272,11 +271,11 @@
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return getFlowEntryInternal(rule);
+            return flowTable.getFlowEntry(rule);
         }
 
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), rule.deviceId());
+                  replicaInfo.master().orNull(), rule.deviceId());
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -292,19 +291,7 @@
         return null;
     }
 
-    private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-        flowEntriesLock.readLock().lock();
-        try {
-            for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
-                if (f.equals(rule)) {
-                    return f;
-                }
-            }
-        } finally {
-            flowEntriesLock.readLock().unlock();
-        }
-        return null;
-    }
+
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -317,11 +304,11 @@
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return getFlowEntriesInternal(deviceId);
+            return flowTable.getFlowEntries(deviceId);
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), deviceId);
+                  replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -337,30 +324,26 @@
         return Collections.emptyList();
     }
 
-    private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-        flowEntriesLock.readLock().lock();
-        try {
-            Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
-            if (rules == null) {
-                return Collections.emptySet();
-            }
-            return ImmutableSet.copyOf(rules);
-        } finally {
-            flowEntriesLock.readLock().unlock();
-        }
-    }
+
 
     @Override
     public void storeFlowRule(FlowRule rule) {
-        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+        storeBatch(new FlowRuleBatchOperation(
+                Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+                rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
-    public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+    public void storeBatch(FlowRuleBatchOperation operation) {
+
 
         if (operation.getOperations().isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true,
-                                                                       Collections.<FlowRule>emptySet()));
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(),
+                                                operation.deviceId())));
+            return;
         }
 
         DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
@@ -369,110 +352,129 @@
 
         if (!replicaInfo.master().isPresent()) {
             log.warn("Failed to storeBatch: No master for {}", deviceId);
-            // TODO: revisit if this should be "success" from Future point of view
-            // with every FlowEntry failed
-            return Futures.immediateFailedFuture(new IOException("Failed to storeBatch: No master for " + deviceId));
+
+            Set<FlowRule> allFailures = operation.getOperations().stream()
+                    .map(op -> op.getTarget())
+                    .collect(Collectors.toSet());
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+            return;
         }
 
         final NodeId local = clusterService.getLocalNode().id();
         if (replicaInfo.master().get().equals(local)) {
-            return storeBatchInternal(operation);
+            storeBatchInternal(operation);
+            return;
         }
 
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), deviceId);
+                  replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 local,
                 APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
+        //CompletedBatchOperation response;
         try {
             ListenableFuture<byte[]> responseFuture =
                     clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
-        } catch (IOException e) {
-            return Futures.immediateFailedFuture(e);
+            /*response =
+                    Futures.transform(responseFuture,
+                                      new DecodeTo<CompletedBatchOperation>(SERIALIZER))
+                            .get(500 * operation.size(), TimeUnit.MILLISECONDS);
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+
+        } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+            log.warn("Failed to storeBatch: {}", e.getMessage());
+
+            Set<FlowRule> allFailures = operation.getOperations().stream()
+                    .map(op -> op.getTarget())
+                    .collect(Collectors.toSet());
+
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(false, allFailures, deviceId)));
+            return;
         }
+
     }
 
-    private ListenableFuture<CompletedBatchOperation>
-                        storeBatchInternal(FlowRuleBatchOperation operation) {
+    private void storeBatchInternal(FlowRuleBatchOperation operation) {
 
-        final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
-        final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
-        DeviceId did = null;
+        final DeviceId did = operation.deviceId();
+        //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
+        Set<FlowRuleBatchEntry> currentOps;
 
 
-        flowEntriesLock.writeLock().lock();
-        try {
-            for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
-                FlowRule flowRule = batchEntry.target();
-                FlowRuleOperation op = batchEntry.operator();
-                if (did == null) {
-                    did = flowRule.deviceId();
-                }
-                if (op.equals(FlowRuleOperation.REMOVE)) {
-                    StoredFlowEntry entry = getFlowEntryInternal(flowRule);
-                    if (entry != null) {
-                        entry.setState(FlowEntryState.PENDING_REMOVE);
-                        toRemove.add(batchEntry);
+        currentOps = operation.getOperations().stream().map(
+                op -> {
+                    StoredFlowEntry entry;
+                    switch (op.getOperator()) {
+                        case ADD:
+                            entry = new DefaultFlowEntry(op.getTarget());
+                            // always add requested FlowRule

+                            // Note: 2 equal FlowEntry may have different treatment
+                            flowTable.remove(entry.deviceId(), entry);
+                            flowTable.add(entry);
+
+                            return op;
+                        case REMOVE:
+                            entry = flowTable.getFlowEntry(op.target());
+                            if (entry != null) {
+                                entry.setState(FlowEntryState.PENDING_REMOVE);
+                                return op;
+                            }
+                            break;
+                        case MODIFY:
+                            //TODO: figure this out at some point
+                            break;
+                        default:
+                            log.warn("Unknown flow operation operator: {}", op.getOperator());
                     }
-                } else if (op.equals(FlowRuleOperation.ADD)) {
-                    StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-                    DeviceId deviceId = flowRule.deviceId();
-                    Collection<StoredFlowEntry> ft = flowEntries.get(deviceId);
-
-                    // always add requested FlowRule
-                    // Note: 2 equal FlowEntry may have different treatment
-                    ft.remove(flowEntry);
-                    ft.add(flowEntry);
-                    toAdd.add(batchEntry);
+                    return null;
                 }
-            }
-            if (toAdd.isEmpty() && toRemove.isEmpty()) {
-                return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
-            }
-
-            // create remote backup copies
-            updateBackup(did, toAdd, toRemove);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        ).filter(op -> op != null).collect(Collectors.toSet());
+        if (currentOps.isEmpty()) {
+            batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
+            return;
         }
+        updateBackup(did, currentOps);
 
-        SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
-        final int batchId = localBatchIdGen.incrementAndGet();
 
-        pendingFutures.put(batchId, r);
-        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+        notifyDelegate(FlowRuleBatchEvent.requested(new
+             FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
 
-        return r;
+
     }
 
-    private void updateBackup(final DeviceId deviceId,
-                              final List<FlowRuleBatchEntry> toAdd,
-                              final List<FlowRuleBatchEntry> list) {
-
-        Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+    private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
+        Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
 
         if (syncBackup) {
             // wait for backup to complete
             try {
-                submit.get();
+                backup.get();
             } catch (InterruptedException | ExecutionException e) {
                 log.error("Failed to create backups", e);
             }
         }
     }
 
-    private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
-
-        updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
-    }
-
     @Override
     public void deleteFlowRule(FlowRule rule) {
-        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
+        storeBatch(
+                new FlowRuleBatchOperation(
+                        Arrays.asList(
+                                new FlowRuleBatchEntry(
+                                        FlowRuleOperation.REMOVE,
+                                        rule)), rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
@@ -484,37 +486,35 @@
         }
 
         log.warn("Tried to update FlowRule {} state,"
-                + " while the Node was not the master.", rule);
+                         + " while the Node was not the master.", rule);
         return null;
     }
 
     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
         final DeviceId did = rule.deviceId();
 
-        flowEntriesLock.writeLock().lock();
-        try {
-            // check if this new rule is an update to an existing entry
-            StoredFlowEntry stored = getFlowEntryInternal(rule);
-            if (stored != null) {
-                stored.setBytes(rule.bytes());
-                stored.setLife(rule.life());
-                stored.setPackets(rule.packets());
-                if (stored.state() == FlowEntryState.PENDING_ADD) {
-                    stored.setState(FlowEntryState.ADDED);
-                    FlowRuleBatchEntry entry =
-                            new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
-                    updateBackup(did, Arrays.asList(entry));
-                    return new FlowRuleEvent(Type.RULE_ADDED, rule);
-                }
-                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-            }
 
-            // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
-            // TODO: also update backup if the behavior is correct.
-            flowEntries.put(did, new DefaultFlowEntry(rule));
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        // check if this new rule is an update to an existing entry
+        StoredFlowEntry stored = flowTable.getFlowEntry(rule);
+        if (stored != null) {
+            stored.setBytes(rule.bytes());
+            stored.setLife(rule.life());
+            stored.setPackets(rule.packets());
+            if (stored.state() == FlowEntryState.PENDING_ADD) {
+                stored.setState(FlowEntryState.ADDED);
+                FlowRuleBatchEntry entry =
+                        new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+                updateBackup(did, Sets.newHashSet(entry));
+                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+            }
+            return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
+
+        // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+        // TODO: also update backup if the behavior is correct.
+        flowTable.add(rule);
+
+
         return null;
 
     }
@@ -540,9 +540,9 @@
                   replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
-                  clusterService.getLocalNode().id(),
-                  REMOVE_FLOW_ENTRY,
-                  SERIALIZER.encode(rule));
+                clusterService.getLocalNode().id(),
+                REMOVE_FLOW_ENTRY,
+                SERIALIZER.encode(rule));
 
         try {
             Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -555,38 +555,42 @@
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
         final DeviceId deviceId = rule.deviceId();
-        flowEntriesLock.writeLock().lock();
-        try {
-            // This is where one could mark a rule as removed and still keep it in the store.
-            final boolean removed = flowEntries.remove(deviceId, rule);
-            FlowRuleBatchEntry entry =
-                    new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
-            updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
-            if (removed) {
-                return new FlowRuleEvent(RULE_REMOVED, rule);
-            } else {
-                return null;
-            }
-        } finally {
-            flowEntriesLock.writeLock().unlock();
+        // This is where one could mark a rule as removed and still keep it in the store.
+        final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
+        FlowRuleBatchEntry entry =
+                new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+        updateBackup(deviceId, Sets.newHashSet(entry));
+        if (removed) {
+            return new FlowRuleEvent(RULE_REMOVED, rule);
+        } else {
+            return null;
         }
+
     }
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
-        final Integer batchId = event.subject().batchId();
-        SettableFuture<CompletedBatchOperation> future
-            = pendingFutures.getIfPresent(batchId);
-        if (future != null) {
-            future.set(event.result());
-            pendingFutures.invalidate(batchId);
+        //FIXME: need a per device pending response
+
+        NodeId nodeId = pendingResponses.remove(event.subject().batchId());
+        if (nodeId == null) {
+            notifyDelegate(event);
+        } else {
+            try {
+                ClusterMessage message = new ClusterMessage(
+                        clusterService.getLocalNode().id(),
+                        REMOTE_APPLY_COMPLETED,
+                        SERIALIZER.encode(event));
+                clusterCommunicator.sendAndReceive(message, nodeId);
+            } catch (IOException e) {
+                log.warn("Failed to respond to peer for batch operation result");
+            }
         }
-        notifyDelegate(event);
     }
 
     private void loadFromBackup(final DeviceId did) {
 
-        flowEntriesLock.writeLock().lock();
+
         try {
             log.debug("Loading FlowRules for {} from backups", did);
             SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -595,38 +599,21 @@
 
                 log.trace("loading {}", e.getValue());
                 for (StoredFlowEntry entry : e.getValue()) {
-                    flowEntries.remove(did, entry);
-                    flowEntries.put(did, entry);
+                    flowTable.getFlowEntriesById(entry).remove(entry);
+                    flowTable.getFlowEntriesById(entry).add(entry);
+
+
                 }
             }
         } catch (ExecutionException e) {
             log.error("Failed to load backup flowtable for {}", did, e);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
         }
     }
 
     private void removeFromPrimary(final DeviceId did) {
-        Collection<StoredFlowEntry> removed = null;
-        flowEntriesLock.writeLock().lock();
-        try {
-            removed = flowEntries.removeAll(did);
-        } finally {
-            flowEntriesLock.writeLock().unlock();
-        }
-        log.trace("removedFromPrimary {}", removed);
+        flowTable.clearDevice(did);
     }
 
-    private static final class TimeoutFuture
-        implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
-        @Override
-        public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
-            // wrapping in ExecutionException to support Future.get
-            notification.getValue()
-                .setException(new ExecutionException("Timed out",
-                                                     new TimeoutException()));
-        }
-    }
 
     private final class OnStoreBatch implements ClusterMessageHandler {
         private final NodeId local;
@@ -640,7 +627,7 @@
             FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
             log.debug("received batch request {}", operation);
 
-            final DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+            final DeviceId deviceId = operation.deviceId();
             ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
             if (!local.equals(replicaInfo.master().orNull())) {
 
@@ -648,7 +635,7 @@
                 for (FlowRuleBatchEntry op : operation.getOperations()) {
                     failures.add(op.target());
                 }
-                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                 // This node is no longer the master, respond as all failed.
                 // TODO: we might want to wrap response in envelope
                 // to distinguish sw programming failure and hand over
@@ -661,36 +648,15 @@
                 return;
             }
 
-            final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
-            f.addListener(new Runnable() {
+            pendingResponses.put(operation.id(), message.sender());
+            storeBatchInternal(operation);
 
-                @Override
-                public void run() {
-                    CompletedBatchOperation result;
-                    try {
-                        result = f.get();
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.error("Batch operation failed", e);
-                        // create everything failed response
-                        Set<FlowRule> failures = new HashSet<>(operation.size());
-                        for (FlowRuleBatchEntry op : operation.getOperations()) {
-                            failures.add(op.target());
-                        }
-                        result = new CompletedBatchOperation(false, failures);
-                    }
-                    try {
-                        message.respond(SERIALIZER.encode(result));
-                    } catch (IOException e) {
-                        log.error("Failed to respond back", e);
-                    }
-                }
-            }, futureListeners);
         }
     }
 
     private final class SMapLoader
-        extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+            extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
 
         @Override
         public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
@@ -701,7 +667,7 @@
     }
 
     private final class InternalReplicaInfoEventListener
-        implements ReplicaInfoEventListener {
+            implements ReplicaInfoEventListener {
 
         @Override
         public void event(ReplicaInfoEvent event) {
@@ -710,98 +676,166 @@
             final ReplicaInfo rInfo = event.replicaInfo();
 
             switch (event.type()) {
-            case MASTER_CHANGED:
-                if (local.equals(rInfo.master().orNull())) {
-                    // This node is the new master, populate local structure
-                    // from backup
-                    loadFromBackup(did);
-                } else {
-                    // This node is no longer the master holder,
-                    // clean local structure
-                    removeFromPrimary(did);
-                    // TODO: probably should stop pending backup activities in
-                    // executors to avoid overwriting with old value
-                }
-                break;
-            default:
-                break;
+                case MASTER_CHANGED:
+                    if (local.equals(rInfo.master().orNull())) {
+                        // This node is the new master, populate local structure
+                        // from backup
+                        loadFromBackup(did);
+                    } else {
+                        // This node is no longer the master holder,
+                        // clean local structure
+                        removeFromPrimary(did);
+                        // TODO: probably should stop pending backup activities in
+                        // executors to avoid overwriting with old value
+                    }
+                    break;
+                default:
+                    break;
 
             }
         }
     }
 
     // Task to update FlowEntries in backup HZ store
-    // TODO: Should be refactored to contain only one list and not
-    //      toAdd and toRemove
     private final class UpdateBackup implements Runnable {
 
         private final DeviceId deviceId;
-        private final List<FlowRuleBatchEntry> toAdd;
-        private final List<FlowRuleBatchEntry> toRemove;
+        private final Set<FlowRuleBatchEntry> ops;
+
 
         public UpdateBackup(DeviceId deviceId,
-                             List<FlowRuleBatchEntry> toAdd,
-                             List<FlowRuleBatchEntry> list) {
+                            Set<FlowRuleBatchEntry> ops) {
             this.deviceId = checkNotNull(deviceId);
-            this.toAdd = checkNotNull(toAdd);
-            this.toRemove = checkNotNull(list);
+            this.ops = checkNotNull(ops);
+
         }
 
         @Override
         public void run() {
             try {
-                log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+                log.trace("update backup {} {}", deviceId, ops
+                );
                 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
-                // Following should be rewritten using async APIs
-                for (FlowRuleBatchEntry bEntry : toAdd) {
-                    final FlowRule entry = bEntry.target();
-                    final FlowId id = entry.id();
-                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
-                    List<StoredFlowEntry> list = new ArrayList<>();
-                    if (original != null) {
-                        list.addAll(original);
-                    }
 
-                    list.remove(bEntry.target());
-                    list.add((StoredFlowEntry) entry);
 
-                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
-                    boolean success;
-                    if (original == null) {
-                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
-                    } else {
-                        success = backupFlowTable.replace(id, original, newValue);
-                    }
-                    if (!success) {
-                        log.error("Updating backup failed.");
-                    }
-                }
-                for (FlowRuleBatchEntry bEntry : toRemove) {
-                    final FlowRule entry = bEntry.target();
-                    final FlowId id = entry.id();
-                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
-                    List<StoredFlowEntry> list = new ArrayList<>();
-                    if (original != null) {
-                        list.addAll(original);
-                    }
+                ops.stream().forEach(
+                        op -> {
+                            final FlowRule entry = op.getTarget();
+                            final FlowId id = entry.id();
+                            ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+                            List<StoredFlowEntry> list = new ArrayList<>();
+                            if (original != null) {
+                                list.addAll(original);
+                            }
+                            list.remove(op.getTarget());
+                            if (op.getOperator() == FlowRuleOperation.ADD) {
+                                list.add((StoredFlowEntry) entry);
+                            }
 
-                    list.remove(bEntry.target());
+                            ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+                            boolean success;
+                            if (original == null) {
+                                success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+                            } else {
+                                success = backupFlowTable.replace(id, original, newValue);
+                            }
+                            if (!success) {
+                                log.error("Updating backup failed.");
+                            }
 
-                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
-                    boolean success;
-                    if (original == null) {
-                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
-                    } else {
-                        success = backupFlowTable.replace(id, original, newValue);
-                    }
-                    if (!success) {
-                        log.error("Updating backup failed.");
-                    }
-                }
+                        }
+                );
             } catch (ExecutionException e) {
                 log.error("Failed to write to backups", e);
             }
 
         }
     }
+
+    private class InternalFlowTable {
+
+        /*
+            TODO: This needs to be cleaned up. Perhaps using the eventually consistent
+            map when it supports distributed to a sequence of instances.
+         */
+
+
+        private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+                flowEntries = new ConcurrentHashMap<>();
+
+
+        private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
+            return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+        }
+
+        /**
+         * Returns the flow table for specified device.
+         *
+         * @param deviceId identifier of the device
+         * @return Map representing Flow Table of given device.
+         */
+        private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
+            return createIfAbsentUnchecked(flowEntries,
+                                           deviceId, lazyEmptyFlowTable());
+        }
+
+        private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
+            final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+            Set<StoredFlowEntry> r = flowTable.get(flowId);
+            if (r == null) {
+                final Set<StoredFlowEntry> concurrentlyAdded;
+                r = new CopyOnWriteArraySet<>();
+                concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
+                if (concurrentlyAdded != null) {
+                    return concurrentlyAdded;
+                }
+            }
+            return r;
+        }
+
+        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+            for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
+                if (f.equals(rule)) {
+                    return f;
+                }
+            }
+            return null;
+        }
+
+        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+            return getFlowTable(deviceId).values().stream()
+                    .flatMap((list -> list.stream())).collect(Collectors.toSet());
+
+        }
+
+
+        public StoredFlowEntry getFlowEntry(FlowRule rule) {
+            return getFlowEntryInternal(rule);
+        }
+
+        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
+            return getFlowEntriesInternal(deviceId);
+        }
+
+        public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
+            return getFlowEntriesInternal(entry.deviceId(), entry.id());
+        }
+
+        public void add(FlowEntry rule) {
+            ((CopyOnWriteArraySet)
+                    getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
+        }
+
+        public boolean remove(DeviceId deviceId, FlowEntry rule) {
+            return ((CopyOnWriteArraySet)
+                    getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
+            //return flowEntries.remove(deviceId, rule);
+        }
+
+        public void clearDevice(DeviceId did) {
+            flowEntries.remove(did);
+        }
+    }
+
+
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
index a21d73a..79df272 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
@@ -34,4 +34,7 @@
 
     public static final MessageSubject REMOVE_FLOW_ENTRY
         = new MessageSubject("peer-forward-remove-flow-entry");
+
+    public static final MessageSubject REMOTE_APPLY_COMPLETED
+            = new MessageSubject("peer-apply-completed");
 }
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 289efe3..7c18820 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -59,15 +59,17 @@
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
@@ -162,6 +164,7 @@
             .register(Collections.emptySet().getClass())
             .register(Optional.class)
             .register(Collections.emptyList().getClass())
+            .register(Collections.unmodifiableSet(Collections.emptySet()).getClass())
             .build();
 
     /**
@@ -255,6 +258,9 @@
                     L3ModificationInstruction.L3SubType.class,
                     L3ModificationInstruction.ModIPInstruction.class,
                     RoleInfo.class,
+                    FlowRuleBatchEvent.class,
+                    FlowRuleBatchEvent.Type.class,
+                    FlowRuleBatchRequest.class,
                     FlowRuleBatchOperation.class,
                     CompletedBatchOperation.class,
                     FlowRuleBatchEntry.class,
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
index a8e15a2..177618d 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleFlowRuleStore.java
@@ -21,13 +21,13 @@
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.FluentIterable;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -46,7 +46,6 @@
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.store.AbstractStore;
-import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -56,7 +55,6 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -261,13 +259,14 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> storeBatch(
-            FlowRuleBatchOperation batchOperation) {
+    public void storeBatch(
+            FlowRuleBatchOperation operation) {
         List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
         List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
-        for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
-            final FlowRule flowRule = entry.target();
-            if (entry.operator().equals(FlowRuleOperation.ADD)) {
+
+        for (FlowRuleBatchEntry entry : operation.getOperations()) {
+            final FlowRule flowRule = entry.getTarget();
+            if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
                 if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
                     storeFlowRule(flowRule);
                     toAdd.add(entry);
@@ -283,21 +282,27 @@
         }
 
         if (toAdd.isEmpty() && toRemove.isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+            notifyDelegate(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(),
+                                                operation.deviceId())));
+            return;
         }
 
         SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
         final int batchId = localBatchIdGen.incrementAndGet();
 
         pendingFutures.put(batchId, r);
-        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
 
-        return r;
+        toAdd.addAll(toRemove);
+        notifyDelegate(FlowRuleBatchEvent.requested(
+                new FlowRuleBatchRequest(batchId, Sets.newHashSet(toAdd)), operation.deviceId()));
+
     }
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
-        final Integer batchId = event.subject().batchId();
+        final Long batchId = event.subject().batchId();
         SettableFuture<CompletedBatchOperation> future
             = pendingFutures.getIfPresent(batchId);
         if (future != null) {
diff --git a/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java b/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
index b2b0b51..fb17bf9 100644
--- a/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
+++ b/providers/null/device/src/main/java/org/onosproject/provider/nil/device/impl/NullDeviceProvider.java
@@ -116,7 +116,7 @@
     @Activate
     public void activate(ComponentContext context) {
         providerService = providerRegistry.register(this);
-        if (modified(context)) {
+        if (!modified(context)) {
             deviceBuilder.submit(new DeviceCreator(true));
         }
         log.info("Started");
@@ -173,6 +173,9 @@
             chgd |= true;
         }
         log.info("Using settings numDevices={}, numPorts={}", numDevices, numPorts);
+        if (chgd) {
+            deviceBuilder.submit(new DeviceCreator(true));
+        }
         return chgd;
     }
 
diff --git a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
index e882c76..0686349 100644
--- a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
+++ b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
@@ -15,9 +15,7 @@
  */
 package org.onosproject.provider.nil.flow.impl;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,12 +27,12 @@
 import org.onlab.util.Timer;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
@@ -43,7 +41,9 @@
 import org.slf4j.Logger;
 
 import java.util.Collections;
-import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -59,7 +59,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleProviderRegistry providerRegistry;
 
-    private Multimap<DeviceId, FlowEntry> flowTable = HashMultimap.create();
+    private ConcurrentMap<DeviceId, Set<FlowEntry>> flowTable = new ConcurrentHashMap<>();
 
     private FlowRuleProviderService providerService;
 
@@ -88,18 +88,10 @@
     }
 
     @Override
-    public void applyFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            flowTable.put(flowRules[i].deviceId(), new DefaultFlowEntry(flowRules[i]));
-        }
-    }
+    public void applyFlowRule(FlowRule... flowRules) {}
 
     @Override
-    public void removeFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            flowTable.remove(flowRules[i].deviceId(), flowRules[i]);
-        }
-    }
+    public void removeFlowRule(FlowRule... flowRules) {}
 
     @Override
     public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
@@ -107,26 +99,32 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> executeBatch(
-            BatchOperation<FlowRuleBatchEntry> batch) {
+    public void executeBatch(
+            FlowRuleBatchOperation batch) {
+        Set<FlowEntry> flowRules = flowTable.getOrDefault(batch.deviceId(), Sets.newConcurrentHashSet());
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             switch (fbe.operator()) {
                 case ADD:
-                    applyFlowRule(fbe.target());
+                    flowRules.add(new DefaultFlowEntry(fbe.target()));
                     break;
                 case REMOVE:
-                    removeFlowRule(fbe.target());
+                    flowRules.remove(new DefaultFlowEntry(fbe.target()));
                     break;
                 case MODIFY:
-                    removeFlowRule(fbe.target());
-                    applyFlowRule(fbe.target());
+                    FlowEntry entry = new DefaultFlowEntry(fbe.target());
+                    flowRules.remove(entry);
+                    flowRules.add(entry);
                     break;
                 default:
                     log.error("Unknown flow operation: {}", fbe);
             }
         }
-        return Futures.immediateFuture(
-                new CompletedBatchOperation(true, Collections.emptySet()));
+        flowTable.put(batch.deviceId(), flowRules);
+        providerService.batchOperationCompleted(batch.id(),
+                                                new CompletedBatchOperation(
+                                                        true,
+                                                        Collections.emptySet(),
+                                                        batch.deviceId()));
     }
 
     private class StatisticTask implements TimerTask {
@@ -134,10 +132,11 @@
         @Override
         public void run(Timeout to) throws Exception {
             for (DeviceId devId : flowTable.keySet()) {
-                providerService.pushFlowMetrics(devId, flowTable.get(devId));
+                    providerService.pushFlowMetrics(devId,
+                                                    flowTable.getOrDefault(devId, Collections.emptySet()));
             }
-
             timeout = timer.newTimeout(to.getTask(), 5, TimeUnit.SECONDS);
+
         }
     }
 }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
index 04ff897..8f2b5f4 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowEntryBuilder.java
@@ -35,6 +35,7 @@
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.openflow.controller.Dpid;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
 import org.projectfloodlight.openflow.protocol.OFInstructionType;
@@ -74,13 +75,16 @@
 
     private final OFFlowStatsEntry stat;
     private final OFFlowRemoved removed;
+    private final OFFlowMod flowMod;
 
     private final Match match;
     private final List<OFAction> actions;
 
     private final Dpid dpid;
 
-    private final boolean addedRule;
+    public enum FlowType { STAT, REMOVED, MOD }
+
+    private final FlowType type;
 
 
     public FlowEntryBuilder(Dpid dpid, OFFlowStatsEntry entry) {
@@ -89,7 +93,8 @@
         this.actions = getActions(entry);
         this.dpid = dpid;
         this.removed = null;
-        this.addedRule = true;
+        this.flowMod = null;
+        this.type = FlowType.STAT;
     }
 
     public FlowEntryBuilder(Dpid dpid, OFFlowRemoved removed) {
@@ -99,26 +104,48 @@
         this.dpid = dpid;
         this.actions = null;
         this.stat = null;
-        this.addedRule = false;
+        this.flowMod = null;
+        this.type = FlowType.REMOVED;
 
     }
 
-    public FlowEntry build() {
-        if (addedRule) {
-            FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
-                    buildSelector(), buildTreatment(), stat.getPriority(),
-                    stat.getCookie().getValue(), stat.getIdleTimeout(), false);
-            return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
-                    stat.getDurationSec(), stat.getPacketCount().getValue(),
-                    stat.getByteCount().getValue());
+    public FlowEntryBuilder(Dpid dpid, OFFlowMod fm) {
+        this.match = fm.getMatch();
+        this.dpid = dpid;
+        this.actions = fm.getActions();
+        this.type = FlowType.MOD;
+        this.flowMod = fm;
+        this.stat = null;
+        this.removed = null;
+    }
 
-        } else {
-            FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
-                    buildSelector(), null, removed.getPriority(),
-                   removed.getCookie().getValue(), removed.getIdleTimeout(), false);
-            return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
-                    removed.getPacketCount().getValue(), removed.getByteCount().getValue());
+    public FlowEntry build(FlowEntryState... state) {
+        FlowRule rule;
+        switch (this.type) {
+            case STAT:
+                rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+                                      buildSelector(), buildTreatment(), stat.getPriority(),
+                                      stat.getCookie().getValue(), stat.getIdleTimeout(), false);
+                return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
+                                      stat.getDurationSec(), stat.getPacketCount().getValue(),
+                                      stat.getByteCount().getValue());
+            case REMOVED:
+                rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+                                      buildSelector(), null, removed.getPriority(),
+                                      removed.getCookie().getValue(), removed.getIdleTimeout(), false);
+                return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
+                                      removed.getPacketCount().getValue(), removed.getByteCount().getValue());
+            case MOD:
+                FlowEntryState flowState = state.length > 0 ? state[0] : FlowEntryState.FAILED;
+                rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
+                                      buildSelector(), buildTreatment(), flowMod.getPriority(),
+                                      flowMod.getCookie().getValue(), flowMod.getIdleTimeout(), false);
+                return new DefaultFlowEntry(rule, flowState, 0, 0, 0);
+            default:
+                log.error("Unknown flow type : {}", this.type);
+                return null;
         }
+
     }
 
     private List<OFAction> getActions(OFFlowStatsEntry entry) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index dd0bcaa..70d35aa 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,25 +15,13 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,19 +29,16 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
-import org.onosproject.net.topology.TopologyService;
 import org.onosproject.openflow.controller.Dpid;
 import org.onosproject.openflow.controller.OpenFlowController;
 import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -63,6 +48,7 @@
 import org.projectfloodlight.openflow.protocol.OFActionType;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFErrorType;
 import org.projectfloodlight.openflow.protocol.OFFlowMod;
 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
@@ -75,21 +61,22 @@
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
 import org.projectfloodlight.openflow.types.OFPort;
 import org.slf4j.Logger;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
 
 /**
  * Provider which uses an OpenFlow controller to detect network
@@ -98,8 +85,6 @@
 @Component(immediate = true)
 public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
 
-    enum BatchState { STARTED, FINISHED, CANCELLED }
-
     private static final int LOWEST_PRIORITY = 0;
 
     private final Logger log = getLogger(getClass());
@@ -110,22 +95,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenFlowController controller;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected TopologyService topologyService;
 
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
 
-    // FIXME: This should be an expiring map to ensure futures that don't have
-    // a future eventually get garbage collected.
-    private final Map<Long, InstallationFuture> pendingFutures = new ConcurrentHashMap<>();
-
-    private final Map<Long, InstallationFuture> pendingFMs = new ConcurrentHashMap<>();
+    private Cache<Long, InternalCacheEntry> pendingBatches;
 
     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
 
-    private final AtomicLong xidCounter = new AtomicLong(1);
 
     /**
      * Creates an OpenFlow host provider.
@@ -140,6 +118,16 @@
         controller.addListener(listener);
         controller.addEventListener(listener);
 
+        pendingBatches = CacheBuilder.newBuilder()
+                .expireAfterWrite(10, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        providerService.batchOperationCompleted(notification.getKey(),
+                                                                notification.getValue().failedCompletion());
+                    }
+                }).build();
+
+
         for (OpenFlowSwitch sw : controller.getSwitches()) {
             FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
             fsc.start();
@@ -160,8 +148,8 @@
 
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            applyRule(flowRules[i]);
+        for (FlowRule flowRule : flowRules) {
+            applyRule(flowRule);
         }
     }
 
@@ -179,8 +167,8 @@
 
     @Override
     public void removeFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            removeRule(flowRules[i]);
+        for (FlowRule flowRule : flowRules) {
+            removeRule(flowRule);
         }
 
     }
@@ -203,36 +191,20 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
-        final Set<Dpid> sws = Sets.newConcurrentHashSet();
-        final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
 
-        /*
-         * Use identity hash map for reference equality as we could have equal
-         * flow mods for different switches.
-         */
-        Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
-        Map<OFFlowMod, OpenFlowSwitch.TableType> modTypes = Maps.newIdentityHashMap();
+    public void executeBatch(FlowRuleBatchOperation batch) {
+
+        pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
+
+
+        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
+        OFFlowMod mod;
+
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            FlowRule flowRule = fbe.target();
-            final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
-            OpenFlowSwitch sw = controller.getSwitch(dpid);
-            if (sw == null) {
-                /*
-                 * if a switch we are supposed to install to is gone then
-                 * cancel (ie. rollback) the work that has been done so far
-                 * and return the associated future.
-                 */
-                InstallationFuture failed = new InstallationFuture(sws, fmXids);
-                failed.cancel(true);
-                return failed;
-            }
-            sws.add(dpid);
-            final Long flowModXid = xidCounter.getAndIncrement();
+
             FlowModBuilder builder =
-                    FlowModBuilder.builder(flowRule, sw.factory(),
-                                           Optional.of(flowModXid));
-            OFFlowMod mod = null;
+                    FlowModBuilder.builder(fbe.target(), sw.factory(),
+                                           Optional.of(batch.id()));
             switch (fbe.operator()) {
                 case ADD:
                     mod = builder.buildFlowAdd();
@@ -244,34 +216,16 @@
                     mod = builder.buildFlowMod();
                     break;
                 default:
-                    log.error("Unsupported batch operation {}", fbe.operator());
-            }
-            if (mod != null) {
-                mods.put(mod, sw);
-                modTypes.put(mod, getTableType(flowRule.type()));
-                fmXids.put(flowModXid, fbe);
-            } else {
-                log.error("Conversion of flowrule {} failed.", flowRule);
-            }
+                    log.error("Unsupported batch operation {}; skipping flowmod {}",
+                              fbe.operator(), fbe);
+                    continue;
+                }
+            sw.sendMsg(mod);
         }
-        InstallationFuture installation = new InstallationFuture(sws, fmXids);
-        for (Long xid : fmXids.keySet()) {
-            pendingFMs.put(xid, installation);
-        }
-
-        pendingFutures.put(installation.xid(), installation);
-        for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
-            OpenFlowSwitch sw = entry.getValue();
-            OFFlowMod mod = entry.getKey();
-            OpenFlowSwitch.TableType tableType = modTypes.get(mod);
-            if (tableType == OpenFlowSwitch.TableType.NONE) {
-                sw.sendMsg(mod);
-            } else {
-                sw.sendMsg(mod, tableType);
-            }
-        }
-        installation.verify();
-        return installation;
+        OFBarrierRequest.Builder builder = sw.factory()
+                .buildBarrierRequest()
+                .setXid(batch.id());
+        sw.sendMsg(builder.build());
     }
 
     private OpenFlowSwitch.TableType getTableType(FlowRule.Type type) {
@@ -287,13 +241,12 @@
         }
     }
 
+
+
+
     private class InternalFlowProvider
             implements OpenFlowSwitchListener, OpenFlowEventListener {
 
-
-        private final Multimap<DeviceId, FlowEntry> completeEntries =
-                ArrayListMultimap.create();
-
         @Override
         public void switchAdded(Dpid dpid) {
             FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
@@ -320,7 +273,6 @@
 
         @Override
         public void handleMessage(Dpid dpid, OFMessage msg) {
-            InstallationFuture future = null;
             switch (msg.getType()) {
                 case FLOW_REMOVED:
                     OFFlowRemoved removed = (OFFlowRemoved) msg;
@@ -334,22 +286,42 @@
                     }
                     break;
                 case BARRIER_REPLY:
-                    future = pendingFutures.get(msg.getXid());
-                    if (future != null) {
-                        future.satisfyRequirement(dpid);
-                    } else {
-                        log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+                    try {
+                        InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+                        if (entry != null) {
+                            providerService.batchOperationCompleted(msg.getXid(), entry.completed());
+                        } else {
+                            log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+                        }
+                    } finally {
+                        pendingBatches.invalidate(msg.getXid());
                     }
                     break;
                 case ERROR:
                     log.warn("received Error message {} from {}", msg, dpid);
-                    future = pendingFMs.get(msg.getXid());
-                    if (future != null) {
-                        future.fail((OFErrorMsg) msg, dpid);
+
+                    OFErrorMsg error = (OFErrorMsg) msg;
+                    if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+                        OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+                        if (fmFailed.getData().getParsedMessage().isPresent()) {
+                            OFMessage m = fmFailed.getData().getParsedMessage().get();
+                            OFFlowMod fm = (OFFlowMod) m;
+                            InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+                            if (entry != null) {
+                                entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
+                            } else {
+                                log.error("No matching batch for this error: {}", error);
+                            }
+                        } else {
+                            //FIXME: Potentially add flowtracking to avoid this message.
+                            log.error("Flow installation failed but switch didn't" +
+                                              " tell us which one.");
+                        }
                     } else {
-                        log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
+                        log.warn("Received error {}", error);
                     }
-                    break;
+
+
                 default:
                     log.debug("Unhandled message type: {}", msg.getType());
             }
@@ -402,198 +374,50 @@
 
     }
 
-    private class InstallationFuture implements Future<CompletedBatchOperation> {
+    /**
+     * The internal cache entry holding the original request as well
+     * as accumulating the any failures along the way.
+     *
+     * If this entry is evicted from the cache then the entire operation
+     * is considered failed. Otherwise, only the failures reported by the device
+     * will be propagated up.
+     */
+    private class InternalCacheEntry {
 
-        // barrier xid
-        private final Long xid;
-        // waiting for barrier reply from...
-        private final Set<Dpid> sws;
-        private final AtomicBoolean ok = new AtomicBoolean(true);
-        // FlowMod xid ->
-        private final Map<Long, FlowRuleBatchEntry> fms;
+        private final FlowRuleBatchOperation operation;
+        private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
 
-
-        private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
-        // Failed batch operation id
-        private Long failedId;
-
-        private final CountDownLatch countDownLatch;
-        private BatchState state;
-
-        public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
-            this.xid = xidCounter.getAndIncrement();
-            this.state = BatchState.STARTED;
-            this.sws = sws;
-            this.fms = fmXids;
-            countDownLatch = new CountDownLatch(sws.size());
+        public InternalCacheEntry(FlowRuleBatchOperation operation) {
+            this.operation = operation;
         }
 
-        public Long xid() {
-            return xid;
+        /**
+         * Appends a failed rule to the set of failed items.
+         * @param rule the failed rule
+         */
+        public void appendFailure(FlowRule rule) {
+            failures.add(rule);
         }
 
-        public void fail(OFErrorMsg msg, Dpid dpid) {
-
-            ok.set(false);
-            FlowEntry fe = null;
-            FlowRuleBatchEntry fbe = fms.get(msg.getXid());
-            failedId = fbe.id();
-            FlowRule offending = fbe.target();
-            //TODO handle specific error msgs
-            switch (msg.getErrType()) {
-                case BAD_ACTION:
-                    OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
-                                              bad.getCode().ordinal());
-                    break;
-                case BAD_INSTRUCTION:
-                    OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
-                                              badins.getCode().ordinal());
-                    break;
-                case BAD_MATCH:
-                    OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
-                                              badMatch.getCode().ordinal());
-                    break;
-                case BAD_REQUEST:
-                    OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
-                                              badReq.getCode().ordinal());
-                    break;
-                case FLOW_MOD_FAILED:
-                    OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
-                    fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
-                                              fmFail.getCode().ordinal());
-                    break;
-                case EXPERIMENTER:
-                case GROUP_MOD_FAILED:
-                case HELLO_FAILED:
-                case METER_MOD_FAILED:
-                case PORT_MOD_FAILED:
-                case QUEUE_OP_FAILED:
-                case ROLE_REQUEST_FAILED:
-                case SWITCH_CONFIG_FAILED:
-                case TABLE_FEATURES_FAILED:
-                case TABLE_MOD_FAILED:
-                    fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
-                    break;
-                default:
-                    log.error("Unknown error type {}", msg.getErrType());
-
-            }
-            offendingFlowMods.add(fe);
-
-            removeRequirement(dpid);
+        /**
+         * Fails the entire batch and returns the failed operation.
+         * @return the failed operation
+         */
+        public CompletedBatchOperation failedCompletion() {
+            Set<FlowRule> fails = operation.getOperations().stream()
+                    .map(op -> op.target()).collect(Collectors.toSet());
+            return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
         }
 
-
-        public void satisfyRequirement(Dpid dpid) {
-            log.debug("Satisfaction from switch {}", dpid);
-            removeRequirement(dpid);
+        /**
+         * Returns the completed operation and whether the batch suceeded.
+         * @return the completed operation
+         */
+        public CompletedBatchOperation completed() {
+            return new CompletedBatchOperation(failures.isEmpty(),
+                                               Collections.unmodifiableSet(failures), operation.deviceId());
         }
 
-
-        public void verify() {
-            checkState(!sws.isEmpty());
-            for (Dpid dpid : sws) {
-                OpenFlowSwitch sw = controller.getSwitch(dpid);
-                OFBarrierRequest.Builder builder = sw.factory()
-                        .buildBarrierRequest()
-                        .setXid(xid);
-                sw.sendMsg(builder.build());
-            }
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (isDone()) {
-                return false;
-            }
-            ok.set(false);
-            this.state = BatchState.CANCELLED;
-            cleanUp();
-            for (FlowRuleBatchEntry fbe : fms.values()) {
-                if (fbe.operator() == FlowRuleOperation.ADD ||
-                        fbe.operator() == FlowRuleOperation.MODIFY) {
-                    removeFlowRule(fbe.target());
-                } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
-                    applyRule(fbe.target());
-                }
-
-            }
-            return true;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return this.state == BatchState.CANCELLED;
-        }
-
-        @Override
-        public boolean isDone() {
-            return this.state == BatchState.FINISHED || isCancelled();
-        }
-
-        @Override
-        public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
-            countDownLatch.await();
-            this.state = BatchState.FINISHED;
-            Set<Long> failedIds = (failedId != null) ?  Sets.newHashSet(failedId) : Collections.emptySet();
-            CompletedBatchOperation result =
-                    new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
-            //FIXME do cleanup here (moved by BOC)
-            cleanUp();
-            return result;
-        }
-
-        @Override
-        public CompletedBatchOperation get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException,
-                TimeoutException {
-            if (countDownLatch.await(timeout, unit)) {
-                this.state = BatchState.FINISHED;
-                Set<Long> failedIds = (failedId != null) ?  Sets.newHashSet(failedId) : Collections.emptySet();
-                CompletedBatchOperation result =
-                        new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
-                // FIXME do cleanup here (moved by BOC)
-                cleanUp();
-                return result;
-            }
-            throw new TimeoutException(this.toString());
-        }
-
-        private void cleanUp() {
-            if (isDone() || isCancelled()) {
-                pendingFutures.remove(xid);
-                for (Long xid : fms.keySet()) {
-                    pendingFMs.remove(xid);
-                }
-            }
-        }
-
-        private void removeRequirement(Dpid dpid) {
-            countDownLatch.countDown();
-            sws.remove(dpid);
-            //FIXME don't do cleanup here (moved by BOC)
-            //cleanUp();
-        }
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(getClass())
-                    .add("xid", xid)
-                    .add("pending devices", sws)
-                    .add("devices in batch",
-                         fms.values().stream()
-                             .map((fbe) -> fbe.target().deviceId())
-                             .distinct().collect(Collectors.toList()))
-                    .add("failedId", failedId)
-                    .add("latchCount", countDownLatch.getCount())
-                    .add("state", state)
-                    .add("no error?", ok.get())
-                    .toString();
-        }
     }
 
 }
diff --git a/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg b/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
index 375ed48..1c3a009 100644
--- a/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
+++ b/tools/package/etc/org.onosproject.provider.nil.device.impl.NullDeviceProvider.cfg
@@ -2,7 +2,7 @@
 # Instance-specific configurations, in this case, the number of 
 # devices per node.
 #
-devConfigs = 192.168.56.30:5,192.168.56.40:7
+devConfigs = 192.168.97.132:5,192.168.97.131:5
 
 #
 # Number of ports per device. This is global to all devices
diff --git a/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg b/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
index afc3fd9..a1b1d16 100644
--- a/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
+++ b/tools/package/etc/org.onosproject.provider.nil.link.impl.NullLinkProvider.cfg
@@ -9,4 +9,4 @@
 #
 # Set order of islands to chain together, in a line.
 #
-neighbors = 192.168.56.20,192.168.56.30,192.168.56.40
+neighbors = 192.168.97.132,192.168.97.131