Merge branch 'master' into optical-integration
diff --git a/apps/sdnip/src/main/resources/config-examples/sdnip.json b/apps/sdnip/src/main/resources/config-examples/sdnip.json
index b9a2d56..13f4db8 100644
--- a/apps/sdnip/src/main/resources/config-examples/sdnip.json
+++ b/apps/sdnip/src/main/resources/config-examples/sdnip.json
@@ -14,6 +14,16 @@
             "attachmentDpid" : "00:00:00:00:00:00:00:a2",
 	    "attachmentPort" : "1",
             "ipAddress" : "192.168.30.1"
+        },
+	{
+            "attachmentDpid" : "00:00:00:00:00:00:00:a6",
+	    "attachmentPort" : "1",
+            "ipAddress" : "192.168.40.1"
+        },
+	{
+            "attachmentDpid" : "00:00:00:00:00:00:00:a4",
+	    "attachmentPort" : "4",
+            "ipAddress" : "192.168.60.1"
         }
     ],
     "bgpSpeakers" : [
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
index 6352b53..d2db96b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
@@ -18,7 +18,7 @@
  */
 package org.onlab.onos.net.flow;
 
-import java.util.List;
+import java.util.Set;
 
 /**
  * Interface capturing the result of a batch operation.
@@ -33,9 +33,9 @@
     boolean isSuccess();
 
     /**
-     * Obtains a list of items which failed.
-     * @return a list of failures
+     * Obtains a set of items which failed.
+     * @return a set of failures
      */
-    List<T> failedItems();
+    Set<T> failedItems();
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index 841e948..363831c 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -18,19 +18,19 @@
  */
 package org.onlab.onos.net.flow;
 
-import java.util.List;
+import java.util.Set;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
 
 
     private final boolean success;
-    private final List<FlowEntry> failures;
+    private final Set<FlowEntry> failures;
 
-    public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
+    public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
         this.success = success;
-        this.failures = ImmutableList.copyOf(failures);
+        this.failures = ImmutableSet.copyOf(failures);
     }
 
     @Override
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public List<FlowEntry> failedItems() {
+    public Set<FlowEntry> failedItems() {
         return failures;
     }
 
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
new file mode 100644
index 0000000..4ba3366
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.net.flow;
+
+import org.onlab.onos.event.AbstractEvent;
+
+/**
+ * Describes flow rule batch event.
+ */
+public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
+
+    /**
+     * Type of flow rule events.
+     */
+    public enum Type {
+
+        /**
+         * Signifies that a batch operation has been initiated.
+         */
+        BATCH_OPERATION_REQUESTED,
+
+        /**
+         * Signifies that a batch operation has completed.
+         */
+        BATCH_OPERATION_COMPLETED,
+    }
+
+    private final CompletedBatchOperation result;
+
+    /**
+     * Constructs a new FlowRuleBatchEvent.
+     * @param request batch operation request.
+     * @return event.
+     */
+    public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
+        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+        return event;
+    }
+
+    /**
+     * Constructs a new FlowRuleBatchEvent.
+     * @param request batch operation request.
+     * @param result completed batch operation result.
+     * @return event.
+     */
+    public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
+        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
+        return event;
+    }
+
+    /**
+     * Returns the result of this batch operation.
+     * @return batch operation result.
+     */
+    public CompletedBatchOperation result() {
+        return result;
+    }
+
+    /**
+     * Creates an event of a given type and for the specified flow rule batch.
+     *
+     * @param type    flow rule batch event type
+     * @param batch    event flow rule batch subject
+     */
+    private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
+        super(type, request);
+        this.result = result;
+    }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
new file mode 100644
index 0000000..0414fcb
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.net.flow;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+
+import com.google.common.collect.Lists;
+
+public class FlowRuleBatchRequest {
+
+    private final List<FlowEntry> toAdd;
+    private final List<FlowEntry> toRemove;
+
+    public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+        this.toAdd = Collections.unmodifiableList(toAdd);
+        this.toRemove = Collections.unmodifiableList(toRemove);
+    }
+
+    public List<FlowEntry> toAdd() {
+        return toAdd;
+    }
+
+    public List<FlowEntry> toRemove() {
+        return toRemove;
+    }
+
+    public FlowRuleBatchOperation asBatchOperation() {
+        List<FlowRuleBatchEntry> entries = Lists.newArrayList();
+        for (FlowEntry e : toAdd) {
+            entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
+        }
+        for (FlowEntry e : toRemove) {
+            entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
+        }
+        return new FlowRuleBatchOperation(entries);
+    }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 0b2c3d8..ae74ac5 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -18,11 +18,11 @@
  */
 package org.onlab.onos.net.flow;
 
-import java.util.concurrent.Future;
-
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.provider.Provider;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * Abstraction of a flow rule provider.
  */
@@ -60,6 +60,6 @@
      * @param batch a batch of flow rules
      * @return a future indicating the status of this execution
      */
-    Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+    ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 63b7f77..11bd4ad 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -18,6 +18,8 @@
  */
 package org.onlab.onos.net.flow;
 
+import java.util.concurrent.Future;
+
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Store;
@@ -25,7 +27,7 @@
 /**
  * Manages inventory of flow rules; not intended for direct use.
  */
-public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
+public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDelegate> {
 
     /**
      * Returns the number of flow rule in the store.
@@ -59,12 +61,26 @@
     Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId);
 
     /**
+     // TODO: Better description of method behavior.
      * Stores a new flow rule without generating events.
      *
      * @param rule the flow rule to add
-     * @return true if the rule should be handled locally
      */
-    boolean storeFlowRule(FlowRule rule);
+    void storeFlowRule(FlowRule rule);
+
+    /**
+     * Stores a batch of flow rules.
+     * @param batchOperation batch of flow rules.
+     * @return Future response indicating success/failure of the batch operation
+     *     all the way down to the device.
+     */
+    Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+
+    /**
+     * Invoked on the completion of a storeBatch operation.
+     * @param result
+     */
+    void batchOperationComplete(FlowRuleBatchEvent event);
 
     /**
      * Marks a flow rule for deletion. Actual deletion will occur
@@ -73,7 +89,7 @@
      * @param rule the flow rule to delete
      * @return true if the rule should be handled locally
      */
-    boolean deleteFlowRule(FlowRule rule);
+    void deleteFlowRule(FlowRule rule);
 
     /**
      * Stores a new flow rule, or updates an existing entry.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
index 4e5ebf6..fbd6b55 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
@@ -23,5 +23,5 @@
 /**
  * Flow rule store delegate abstraction.
  */
-public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleEvent> {
+public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleBatchEvent> {
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index d8f89ae..3ef9fc8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,8 +5,10 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -30,7 +32,9 @@
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
 import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -47,6 +51,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Provides implementation of the flow NB &amp; SB APIs.
@@ -104,14 +111,7 @@
     public void applyFlowRules(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
             FlowRule f = flowRules[i];
-            boolean local = store.storeFlowRule(f);
-            if (local) {
-                // TODO: aggregate all local rules and push down once?
-                applyFlowRulesToProviders(f);
-                eventDispatcher.post(
-                        new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
-
-            }
+            store.storeFlowRule(f);
         }
     }
 
@@ -135,13 +135,7 @@
         FlowRule f;
         for (int i = 0; i < flowRules.length; i++) {
             f = flowRules[i];
-            boolean local = store.deleteFlowRule(f);
-            if (local) {
-                // TODO: aggregate all local rules and push down once?
-                removeFlowRulesFromProviders(f);
-                eventDispatcher.post(
-                        new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
-            }
+            store.deleteFlowRule(f);
         }
     }
 
@@ -185,33 +179,21 @@
     @Override
     public Future<CompletedBatchOperation> applyBatch(
             FlowRuleBatchOperation batch) {
-        Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+        Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
                 ArrayListMultimap.create();
         List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             final FlowRule f = fbe.getTarget();
-            final Device device = deviceService.getDevice(f.deviceId());
-            final FlowRuleProvider frp = getProvider(device.providerId());
-            batches.put(frp, fbe);
-            switch (fbe.getOperator()) {
-                case ADD:
-                    store.storeFlowRule(f);
-                    break;
-                case REMOVE:
-                    store.deleteFlowRule(f);
-                    break;
-                case MODIFY:
-                default:
-                    log.error("Batch operation type {} unsupported.", fbe.getOperator());
-            }
+            perDeviceBatches.put(f.deviceId(), fbe);
         }
-        for (FlowRuleProvider provider : batches.keySet()) {
+
+        for (DeviceId deviceId : perDeviceBatches.keySet()) {
             FlowRuleBatchOperation b =
-                    new FlowRuleBatchOperation(batches.get(provider));
-            Future<CompletedBatchOperation> future = provider.executeBatch(b);
+                    new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
+            Future<CompletedBatchOperation> future = store.storeBatch(b);
             futures.add(future);
         }
-        return new FlowRuleBatchFuture(futures, batches);
+        return new FlowRuleBatchFuture(futures, perDeviceBatches);
     }
 
     @Override
@@ -324,6 +306,7 @@
                     post(event);
                 }
             } else {
+                log.info("Removing flow rules....");
                 removeFlowRules(flowEntry);
             }
 
@@ -391,21 +374,47 @@
 
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+        // TODO: Right now we only dispatch events at individual flowEntry level.
+        // It may be more efficient for also dispatch events as a batch.
         @Override
-        public void notify(FlowRuleEvent event) {
+        public void notify(FlowRuleBatchEvent event) {
+            final FlowRuleBatchRequest request = event.subject();
             switch (event.type()) {
-            case RULE_ADD_REQUESTED:
-                applyFlowRulesToProviders(event.subject());
-                break;
-            case RULE_REMOVE_REQUESTED:
-                removeFlowRulesFromProviders(event.subject());
-                break;
+            case BATCH_OPERATION_REQUESTED:
+                for (FlowEntry entry : request.toAdd()) {
+                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
+                }
+                for (FlowEntry entry : request.toRemove()) {
+                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
+                }
+                // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
 
-            case RULE_ADDED:
-            case RULE_REMOVED:
-            case RULE_UPDATED:
-                // only dispatch events related to switch
-                eventDispatcher.post(event);
+                FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+
+                FlowRuleProvider flowRuleProvider =
+                        getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
+                final ListenableFuture<CompletedBatchOperation> result =
+                        flowRuleProvider.executeBatch(batchOperation);
+                result.addListener(new Runnable() {
+                    @Override
+                    public void run() {
+                        store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+                    }
+                }, Executors.newCachedThreadPool());
+
+                break;
+            case BATCH_OPERATION_COMPLETED:
+                Set<FlowEntry> failedItems = event.result().failedItems();
+                for (FlowEntry entry : request.toAdd()) {
+                    if (!failedItems.contains(entry)) {
+                        eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
+                    }
+                }
+                for (FlowEntry entry : request.toRemove()) {
+                    if (!failedItems.contains(entry)) {
+                            eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
+                    }
+                }
                 break;
             default:
                 break;
@@ -413,18 +422,15 @@
         }
     }
 
-    private class FlowRuleBatchFuture
-        implements Future<CompletedBatchOperation> {
+    private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
 
         private final List<Future<CompletedBatchOperation>> futures;
-        private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+        private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
         private final AtomicReference<BatchState> state;
         private CompletedBatchOperation overall;
 
-
-
         public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
-                Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
+                Multimap<DeviceId, FlowRuleBatchEntry> batches) {
             this.futures = futures;
             this.batches = batches;
             state = new AtomicReference<FlowRuleManager.BatchState>();
@@ -466,7 +472,7 @@
             }
 
             boolean success = true;
-            List<FlowEntry> failed = Lists.newLinkedList();
+            Set<FlowEntry> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             for (Future<CompletedBatchOperation> future : futures) {
                 completed = future.get();
@@ -486,7 +492,7 @@
                 return overall;
             }
             boolean success = true;
-            List<FlowEntry> failed = Lists.newLinkedList();
+            Set<FlowEntry> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             long start = System.nanoTime();
             long end = start + unit.toNanos(timeout);
@@ -500,7 +506,7 @@
             return finalizeBatchOperation(success, failed);
         }
 
-        private boolean validateBatchOperation(List<FlowEntry> failed,
+        private boolean validateBatchOperation(Set<FlowEntry> failed,
                 CompletedBatchOperation completed) {
 
             if (isCancelled()) {
@@ -522,7 +528,7 @@
         }
 
         private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                List<FlowEntry> failed) {
+                Set<FlowEntry> failed) {
             synchronized (this) {
                 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
                     if (state.get() == BatchState.FINISHED) {
@@ -545,11 +551,6 @@
                     store.storeFlowRule(fbe.getTarget());
                 }
             }
-
         }
     }
-
-
-
-
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
index e59eb9f..ea1d09c 100644
--- a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
@@ -197,14 +197,7 @@
             checkNotNull(linkDescription, LINK_DESC_NULL);
             checkValidity();
 
-            ConnectPoint src = linkDescription.src();
-            ConnectPoint dst = linkDescription.dst();
-            // if we aren't master for the device associated with the ConnectPoint
-            // we probably shouldn't be doing this.
 
-//            if (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER) {
-//                return;
-//            }
             LinkEvent event = store.createOrUpdateLink(provider().id(),
                                                        linkDescription);
             if (event != null) {
@@ -232,11 +225,7 @@
         public void linksVanished(ConnectPoint connectPoint) {
             checkNotNull(connectPoint, "Connect point cannot be null");
             checkValidity();
-            // if we aren't master for the device associated with the ConnectPoint
-            // we probably shouldn't be doing this.
-            if (deviceService.getRole(connectPoint.deviceId()) != MastershipRole.MASTER) {
-                return;
-            }
+
             log.info("Links for connection point {} vanished", connectPoint);
             // FIXME: This will remove links registered by other providers
             removeLinks(getLinks(connectPoint));
@@ -246,11 +235,7 @@
         public void linksVanished(DeviceId deviceId) {
             checkNotNull(deviceId, DEVICE_ID_NULL);
             checkValidity();
-            // if we aren't master for the device associated with the ConnectPoint
-            // we probably shouldn't be doing this.
-            if (deviceService.getRole(deviceId) != MastershipRole.MASTER) {
-                return;
-            }
+
             log.info("Links for device {} vanished", deviceId);
             removeLinks(getDeviceLinks(deviceId));
         }
diff --git a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
index 90db729..edd7db9 100644
--- a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
@@ -20,7 +20,6 @@
 import org.onlab.onos.net.statistic.StatisticService;
 import org.onlab.onos.net.statistic.StatisticStore;
 import org.slf4j.Logger;
-
 import java.util.Set;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -68,17 +67,52 @@
 
     @Override
     public Link max(Path path) {
-        return null;
+        if (path.links().isEmpty()) {
+            return null;
+        }
+        Load maxLoad = new DefaultLoad();
+        Link maxLink = null;
+        for (Link link : path.links()) {
+            Load load = loadInternal(link.src());
+            if (load.rate() > maxLoad.rate()) {
+                maxLoad = load;
+                maxLink = link;
+            }
+        }
+        return maxLink;
     }
 
     @Override
     public Link min(Path path) {
-        return null;
+        if (path.links().isEmpty()) {
+            return null;
+        }
+        Load minLoad = new DefaultLoad();
+        Link minLink = null;
+        for (Link link : path.links()) {
+            Load load = loadInternal(link.src());
+            if (load.rate() < minLoad.rate()) {
+                minLoad = load;
+                minLink = link;
+            }
+        }
+        return minLink;
     }
 
     @Override
     public FlowRule highestHitter(ConnectPoint connectPoint) {
-        return null;
+        Set<FlowEntry> hitters = statisticStore.getCurrentStatistic(connectPoint);
+        if (hitters.isEmpty()) {
+            return null;
+        }
+
+        FlowEntry max = hitters.iterator().next();
+        for (FlowEntry entry : hitters) {
+            if (entry.bytes() > max.bytes()) {
+                max = entry;
+            }
+        }
+        return max;
     }
 
     private Load loadInternal(ConnectPoint connectPoint) {
@@ -123,16 +157,12 @@
                 case RULE_UPDATED:
                     if (rule instanceof FlowEntry) {
                         statisticStore.addOrUpdateStatistic((FlowEntry) rule);
-                    } else {
-                        log.warn("IT AIN'T A FLOWENTRY");
                     }
                     break;
                 case RULE_ADD_REQUESTED:
-                    log.info("Preparing for stats");
                     statisticStore.prepareForStatistics(rule);
                     break;
                 case RULE_REMOVE_REQUESTED:
-                    log.info("Removing stats");
                     statisticStore.removeFromStatistics(rule);
                     break;
                 case RULE_REMOVED:
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index a2fbc9a..1677af6 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -1,10 +1,17 @@
 package org.onlab.onos.net.flow.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
 
 
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
 
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -12,6 +19,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -59,16 +67,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import static java.util.Collections.EMPTY_LIST;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Test codifying the flow rule service & flow rule provider service contracts.
@@ -182,7 +181,6 @@
 
     // TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
     //backing store is sensitive to the order of additions/removals
-    @SuppressWarnings("unchecked")
     private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
         Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
         Iterable<FlowEntry> rules = service.getFlowEntries(DID);
@@ -526,13 +524,13 @@
         }
 
         @Override
-        public Future<CompletedBatchOperation> executeBatch(
+        public ListenableFuture<CompletedBatchOperation> executeBatch(
                 BatchOperation<FlowRuleBatchEntry> batch) {
             return new TestInstallationFuture();
         }
 
         private class TestInstallationFuture
-                implements Future<CompletedBatchOperation> {
+                implements ListenableFuture<CompletedBatchOperation> {
 
             @Override
             public boolean cancel(boolean mayInterruptIfRunning) {
@@ -550,10 +548,9 @@
             }
 
             @Override
-            @SuppressWarnings("unchecked")
             public CompletedBatchOperation get()
                     throws InterruptedException, ExecutionException {
-                return new CompletedBatchOperation(true, EMPTY_LIST);
+                return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
             }
 
             @Override
@@ -562,6 +559,11 @@
                     ExecutionException, TimeoutException {
                 return null;
             }
+
+            @Override
+            public void addListener(Runnable task, Executor executor) {
+                // TODO: add stuff.
+            }
         }
 
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index bde57c6..85f928a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -5,10 +5,14 @@
 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -19,11 +23,17 @@
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
 import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
@@ -43,6 +53,7 @@
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -50,7 +61,7 @@
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-        extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
         implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
@@ -92,7 +103,7 @@
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.info("received add request for {}", rule);
-                storeFlowEntryInternal(rule);
+                storeFlowRule(rule);
                 // FIXME what to respond.
                 try {
                     message.respond(SERIALIZER.encode("ACK"));
@@ -108,7 +119,7 @@
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.info("received delete request for {}", rule);
-                deleteFlowRuleInternal(rule);
+                deleteFlowRule(rule);
                 // FIXME what to respond.
                 try {
                     message.respond(SERIALIZER.encode("ACK"));
@@ -118,6 +129,22 @@
 
             }
         });
+
+        clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received get flow entry request for {}", rule);
+                FlowEntry flowEntry = getFlowEntryInternal(rule);
+                try {
+                    message.respond(SERIALIZER.encode(flowEntry));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
         log.info("Started");
     }
 
@@ -127,6 +154,9 @@
     }
 
 
+    // TODO: This is not a efficient operation on a distributed sharded
+    // flow store. We need to revisit the need for this operation or at least
+    // make it device specific.
     @Override
     public int getFlowRuleCount() {
         return flowEntries.size();
@@ -134,7 +164,26 @@
 
     @Override
     public synchronized FlowEntry getFlowEntry(FlowRule rule) {
-        return getFlowEntryInternal(rule);
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return getFlowEntryInternal(rule);
+        }
+
+        log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+                replicaInfo.master().orNull(), rule.deviceId());
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
     }
 
     private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
@@ -165,19 +214,30 @@
     }
 
     @Override
-    public boolean storeFlowRule(FlowRule rule) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return storeFlowEntryInternal(rule);
+    public void storeFlowRule(FlowRule rule) {
+        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+    }
+
+    public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+        if (operation.getOperations().isEmpty()) {
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
 
-        log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}",
-                replicaInfo.master().orNull(), rule.deviceId());
+        DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return storeBatchInternal(operation);
+        }
+
+        log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+                replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 FlowStoreMessageSubjects.STORE_FLOW_RULE,
-                SERIALIZER.encode(rule));
+                SERIALIZER.encode(operation));
 
         try {
             ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -186,58 +246,44 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
-        return false;
+
+        return null;
     }
 
-    private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
-        StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-        DeviceId deviceId = flowRule.deviceId();
-        // write to local copy.
-        if (!flowEntries.containsEntry(deviceId, flowEntry)) {
-            flowEntries.put(deviceId, flowEntry);
-            flowEntriesById.put(flowRule.appId(), flowEntry);
-            notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
-            return true;
+    private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+        List<FlowEntry> toRemove = new ArrayList<>();
+        List<FlowEntry> toAdd = new ArrayList<>();
+        // TODO: backup changes to hazelcast map
+        for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+            FlowRule flowRule = batchEntry.getTarget();
+            FlowRuleOperation op = batchEntry.getOperator();
+            if (op.equals(FlowRuleOperation.REMOVE)) {
+                StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+                if (entry != null) {
+                    entry.setState(FlowEntryState.PENDING_REMOVE);
+                }
+                toRemove.add(entry);
+            } else if (op.equals(FlowRuleOperation.ADD)) {
+                StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+                DeviceId deviceId = flowRule.deviceId();
+                if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+                    flowEntries.put(deviceId, flowEntry);
+                    flowEntriesById.put(flowRule.appId(), flowEntry);
+                    toAdd.add(flowEntry);
+                }
+            }
         }
-        // write to backup.
-        // TODO: write to a hazelcast map.
-        return false;
+        if (toAdd.isEmpty() && toRemove.isEmpty()) {
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+        }
+        notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
+        // TODO: imlpement this.
+        return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
     }
 
     @Override
-    public synchronized boolean deleteFlowRule(FlowRule rule) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return deleteFlowRuleInternal(rule);
-        }
-
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.DELETE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
-        return false;
-    }
-
-    private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
-        StoredFlowEntry entry = getFlowEntryInternal(flowRule);
-        if (entry == null) {
-            return false;
-        }
-        entry.setState(FlowEntryState.PENDING_REMOVE);
-
-        // TODO: also update backup.
-
-        notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
-
-        return true;
+    public void deleteFlowRule(FlowRule rule) {
+        storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
     }
 
     @Override
@@ -315,4 +361,9 @@
         }
         // TODO: also update backup.
     }
+
+    @Override
+    public void batchOperationComplete(FlowRuleBatchEvent event) {
+        notifyDelegate(event);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index a43dad6..ca833b8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -12,4 +12,5 @@
     public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
         new MessageSubject("peer-forward-add-or-update-flow-rule");
     public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+    public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e3d8fe0..ee3fb45 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -399,7 +399,7 @@
     }
 
     // Auxiliary extension to allow location to mutate.
-    private class StoredHost extends DefaultHost {
+    private static final class StoredHost extends DefaultHost {
         private Timestamped<HostLocation> location;
 
         /**
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
index c156143..7447161 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -1,7 +1,7 @@
 package org.onlab.onos.store.mastership.impl;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -17,9 +17,9 @@
  * A structure that holds node mastership roles associated with a
  * {@link DeviceId}. This structure needs to be locked through IMap.
  */
-public class RoleValue {
+final class RoleValue {
 
-    protected Map<MastershipRole, List<NodeId>> value = new HashMap<>();
+    protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
 
     public RoleValue() {
         value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
@@ -27,7 +27,8 @@
         value.put(MastershipRole.NONE, new LinkedList<NodeId>());
     }
 
-    public Map<MastershipRole, List<NodeId>> value() {
+    // exposing internals for serialization purpose only
+    Map<MastershipRole, List<NodeId>> value() {
         return Collections.unmodifiableMap(value);
     }
 
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
index 22d1b35..4450e5b 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -35,10 +35,10 @@
 
     @Override
     public void write(Kryo kryo, Output output, RoleValue type) {
-        output.writeInt(type.value().size());
+        final Map<MastershipRole, List<NodeId>> map = type.value();
+        output.writeInt(map.size());
 
-        for (Map.Entry<MastershipRole, List<NodeId>> el :
-                type.value().entrySet()) {
+        for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
             output.writeInt(el.getKey().ordinal());
 
             List<NodeId> nodes = el.getValue();
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
index 93a7b0d..6dd4bfb 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
@@ -492,7 +492,10 @@
     }
 
     private V deserializeVal(byte[] val) {
-        return serializer.decode(val);
+        if (val == null) {
+            return null;
+        }
+        return serializer.decode(val.clone());
     }
 
     private Set<byte[]> serializeKeySet(Set<K> keys) {
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 9b75cea..7fddb01 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -33,6 +33,7 @@
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.StoredFlowEntry;
 import org.onlab.onos.net.flow.criteria.Criteria;
 import org.onlab.onos.net.flow.criteria.Criterion;
 import org.onlab.onos.net.flow.instructions.Instructions;
@@ -97,6 +98,8 @@
                     HostId.class,
                     HostDescription.class,
                     DefaultHostDescription.class,
+                    DefaultFlowEntry.class,
+                    StoredFlowEntry.class,
                     DefaultFlowRule.class,
                     DefaultFlowEntry.class,
                     FlowEntry.FlowEntryState.class,
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index d312af5..bbfc263 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -3,6 +3,8 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -10,6 +12,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -17,11 +20,17 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowId;
 import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
@@ -33,6 +42,7 @@
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.Futures;
 
 /**
  * Manages inventory of flow rules using trivial in-memory implementation.
@@ -40,7 +50,7 @@
 @Component(immediate = true)
 @Service
 public class SimpleFlowRuleStore
-        extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
         implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
@@ -148,12 +158,11 @@
     }
 
     @Override
-    public boolean storeFlowRule(FlowRule rule) {
-        final boolean added = storeFlowRuleInternal(rule);
-        return added;
+    public void storeFlowRule(FlowRule rule) {
+        storeFlowRuleInternal(rule);
     }
 
-    private boolean storeFlowRuleInternal(FlowRule rule) {
+    private void storeFlowRuleInternal(FlowRule rule) {
         StoredFlowEntry f = new DefaultFlowEntry(rule);
         final DeviceId did = f.deviceId();
         final FlowId fid = f.id();
@@ -162,19 +171,20 @@
             for (StoredFlowEntry fe : existing) {
                 if (fe.equals(rule)) {
                     // was already there? ignore
-                    return false;
+                    return;
                 }
             }
             // new flow rule added
             existing.add(f);
-            // TODO: Should we notify only if it's "remote" event?
-            //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
-            return true;
+            notifyDelegate(FlowRuleBatchEvent.create(
+                    new FlowRuleBatchRequest(
+                            Arrays.<FlowEntry>asList(f),
+                            Collections.<FlowEntry>emptyList())));
         }
     }
 
     @Override
-    public boolean deleteFlowRule(FlowRule rule) {
+    public void deleteFlowRule(FlowRule rule) {
 
         List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
 
@@ -184,14 +194,17 @@
                     synchronized (entry) {
                         entry.setState(FlowEntryState.PENDING_REMOVE);
                         // TODO: Should we notify only if it's "remote" event?
-                        //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
-                        return true;
+                        notifyDelegate(FlowRuleBatchEvent.create(
+                                new FlowRuleBatchRequest(
+                                        Collections.<FlowEntry>emptyList(),
+                                        Arrays.<FlowEntry>asList(entry))));
                     }
                 }
             }
         }
+
+
         //log.warn("Cannot find rule {}", rule);
-        return false;
     }
 
     @Override
@@ -237,4 +250,24 @@
         }
         return null;
     }
+
+    @Override
+    public Future<CompletedBatchOperation> storeBatch(
+            FlowRuleBatchOperation batchOperation) {
+        for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+            if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
+                storeFlowRule(entry.getTarget());
+            } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
+                deleteFlowRule(entry.getTarget());
+            } else {
+                throw new UnsupportedOperationException("Unsupported operation type");
+            }
+        }
+        return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+    }
+
+    @Override
+    public void batchOperationComplete(FlowRuleBatchEvent event) {
+        notifyDelegate(event);
+    }
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
index ef80b72..ee8570d 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
@@ -269,7 +269,7 @@
     }
 
     // Auxiliary extension to allow location to mutate.
-    private class StoredHost extends DefaultHost {
+    private static final class StoredHost extends DefaultHost {
         private HostLocation location;
 
         /**
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
index 5047867..4ea2f71 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
@@ -41,7 +41,6 @@
 import org.projectfloodlight.openflow.protocol.OFHelloElem;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPacketIn;
-import org.projectfloodlight.openflow.protocol.OFPacketOut;
 import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply;
 import org.projectfloodlight.openflow.protocol.OFPortDescStatsRequest;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
@@ -661,10 +660,9 @@
          * However, we could be more forgiving
          * @param h the channel handler that received the message
          * @param m the message
-         * @throws SwitchStateException
-         * @throws SwitchStateExeption we always through the execption
+         * @throws SwitchStateException we always throw the exception
          */
-        // needs to be protected because enum members are acutally subclasses
+        // needs to be protected because enum members are actually subclasses
         protected void illegalMessageReceived(OFChannelHandler h, OFMessage m)
                 throws SwitchStateException {
             String msg = getSwitchStateMessage(h, m,
@@ -1025,7 +1023,9 @@
                 // all state for the original switch (with the same dpid),
                 // which we obviously don't want.
                 log.info("{}:removal called", getSwitchInfoString());
-                sw.removeConnectedSwitch();
+                if (sw != null) {
+                    sw.removeConnectedSwitch();
+                }
             } else {
                 // A duplicate was disconnected on this ChannelHandler,
                 // this is the same switch reconnecting, but the original state was
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
index c4c2e19..3d60dfa 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
@@ -1188,7 +1188,8 @@
                 .setHardTimeout(0)
                 .setXid(getNextTransactionId())
                 .build();
-        sendMsg(tableMissEntry);
+
+        write(tableMissEntry);
     }
 
     private void sendBarrier(boolean finalBarrier) {
@@ -1200,7 +1201,8 @@
                 .buildBarrierRequest()
                 .setXid(xid)
                 .build();
-        sendMsg(br);
+
+        write(br);
     }
 
     @Override
@@ -1210,7 +1212,7 @@
 
     @Override
     public void write(OFMessage msg) {
-        this.channel.write(msg);
+        this.channel.write(Collections.singletonList(msg));
 
     }
 
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
index bf4fee0..e60ed90 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
@@ -90,7 +90,7 @@
      * Instantiates discovery manager for the given physical switch. Creates a
      * generic LLDP packet that will be customized for the port it is sent out on.
      * Starts the the timer for the discovery process.
-     *  @param device the physical switch
+     * @param device the physical switch
      * @param masterService
      * @param useBDDP flag to also use BDDP for discovery
      */
@@ -217,7 +217,7 @@
             final PortNumber srcPort = PortNumber.portNumber(onoslldp.getPort());
             final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
             final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
-            this.ackProbe(srcPort.toLong());
+            this.ackProbe(dstPort.toLong());
             ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
             ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
 
@@ -245,7 +245,7 @@
      */
     @Override
     public void run(final Timeout t) {
-        this.log.debug("sending probes");
+        this.log.trace("sending probes");
         synchronized (this) {
             final Iterator<Long> fastIterator = this.fastPorts.iterator();
             Long portNumber;
@@ -256,7 +256,7 @@
                         .getAndIncrement();
 
                 if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) {
-                    this.log.debug("sending fast probe to port");
+                    this.log.trace("sending fast probe to port");
                     sendProbes(portNumber);
                 } else {
                     // Update fast and slow ports
@@ -278,7 +278,7 @@
                 Iterator<Long> slowIterator = this.slowPorts.iterator();
                 while (slowIterator.hasNext()) {
                     portNumber = slowIterator.next();
-                    this.log.debug("sending slow probe to port {}", portNumber);
+                    this.log.trace("sending slow probe to port {}", portNumber);
 
                     sendProbes(portNumber);
 
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 6fb54e8..8d3c018 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -10,7 +10,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,9 +69,11 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Provider which uses an OpenFlow controller to detect network
@@ -97,6 +99,8 @@
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
 
+    // FIXME: This should be an expiring map to ensure futures that don't have
+    // a future eventually get garbage collected.
     private final Map<Long, InstallationFuture> pendingFutures =
             new ConcurrentHashMap<Long, InstallationFuture>();
 
@@ -169,7 +173,7 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+    public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
         final Set<Dpid> sws =
                 Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
         final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
@@ -330,18 +334,20 @@
 
     }
 
-    private class InstallationFuture implements Future<CompletedBatchOperation> {
+    private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
 
         private final Set<Dpid> sws;
         private final AtomicBoolean ok = new AtomicBoolean(true);
         private final Map<Long, FlowRuleBatchEntry> fms;
 
-        private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+        private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
 
         private final CountDownLatch countDownLatch;
         private Long pendingXid;
         private BatchState state;
 
+        private final ExecutionList executionList = new ExecutionList();
+
         public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
             this.state = BatchState.STARTED;
             this.sws = sws;
@@ -350,6 +356,7 @@
         }
 
         public void fail(OFErrorMsg msg, Dpid dpid) {
+
             ok.set(false);
             removeRequirement(dpid);
             FlowEntry fe = null;
@@ -422,6 +429,9 @@
 
         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
+            if (isDone()) {
+                return false;
+            }
             ok.set(false);
             this.state = BatchState.CANCELLED;
             cleanUp();
@@ -434,7 +444,8 @@
                 }
 
             }
-            return isCancelled();
+            invokeCallbacks();
+            return true;
         }
 
         @Override
@@ -444,14 +455,15 @@
 
         @Override
         public boolean isDone() {
-            return this.state == BatchState.FINISHED;
+            return this.state == BatchState.FINISHED || isCancelled();
         }
 
         @Override
         public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
             countDownLatch.await();
             this.state = BatchState.FINISHED;
-            return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+            CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+            return result;
         }
 
         @Override
@@ -460,7 +472,8 @@
                 TimeoutException {
             if (countDownLatch.await(timeout, unit)) {
                 this.state = BatchState.FINISHED;
-                return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+                CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+                return result;
             }
             throw new TimeoutException();
         }
@@ -478,10 +491,21 @@
 
         private void removeRequirement(Dpid dpid) {
             countDownLatch.countDown();
+            if (countDownLatch.getCount() == 0) {
+                invokeCallbacks();
+            }
             sws.remove(dpid);
             cleanUp();
         }
 
+        @Override
+        public void addListener(Runnable runnable, Executor executor) {
+            executionList.add(runnable, executor);
+        }
+
+        private void invokeCallbacks() {
+            executionList.execute();
+        }
     }
 
 }
diff --git a/utils/misc/src/main/java/org/onlab/packet/ChassisId.java b/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
index 5b48e63..4555124 100644
--- a/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
+++ b/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
@@ -32,7 +32,7 @@
      * @param value the value to use.
      */
     public ChassisId(String value) {
-        this.value = Long.valueOf(value, 16);
+        this.value = Long.parseLong(value, 16);
     }
 
     /**
diff --git a/utils/misc/src/main/java/org/onlab/packet/DHCP.java b/utils/misc/src/main/java/org/onlab/packet/DHCP.java
index 2a116b1..119faf9 100644
--- a/utils/misc/src/main/java/org/onlab/packet/DHCP.java
+++ b/utils/misc/src/main/java/org/onlab/packet/DHCP.java
@@ -379,7 +379,7 @@
         // 300
         int optionsLength = 0;
         for (final DHCPOption option : this.options) {
-            if (option.getCode() == 0 || option.getCode() == 255) {
+            if (option.getCode() == 0 || option.getCode() == ((byte) 255)) {
                 optionsLength += 1;
             } else {
                 optionsLength += 2 + (0xff & option.getLength());
diff --git a/utils/misc/src/main/java/org/onlab/packet/IPv4.java b/utils/misc/src/main/java/org/onlab/packet/IPv4.java
index 4b9fd66..634ceff 100644
--- a/utils/misc/src/main/java/org/onlab/packet/IPv4.java
+++ b/utils/misc/src/main/java/org/onlab/packet/IPv4.java
@@ -438,7 +438,7 @@
 
         int result = 0;
         for (int i = 0; i < 4; ++i) {
-            result |= Integer.valueOf(octets[i]) << (3 - i) * 8;
+            result |= Integer.parseInt(octets[i]) << (3 - i) * 8;
         }
         return result;
     }
@@ -471,7 +471,7 @@
         int result = 0;
         for (int i = 0; i < 4; ++i) {
             result = ipAddress >> (3 - i) * 8 & 0xff;
-        sb.append(Integer.valueOf(result).toString());
+        sb.append(result);
         if (i != 3) {
             sb.append(".");
         }
diff --git a/utils/misc/src/main/java/org/onlab/util/HexString.java b/utils/misc/src/main/java/org/onlab/util/HexString.java
index db12aa3..2b91d8e 100644
--- a/utils/misc/src/main/java/org/onlab/util/HexString.java
+++ b/utils/misc/src/main/java/org/onlab/util/HexString.java
@@ -14,7 +14,7 @@
      */
     public static String toHexString(final byte[] bytes) {
         int i;
-        StringBuilder ret = new StringBuilder();
+        StringBuilder ret = new StringBuilder(bytes.length * 3 - 1);
         String tmp;
         for (i = 0; i < bytes.length; i++) {
             if (i > 0) {
@@ -31,22 +31,22 @@
 
     public static String toHexString(final long val, final int padTo) {
         char[] arr = Long.toHexString(val).toCharArray();
-        String ret = "";
+        StringBuilder ret = new StringBuilder(padTo * 3 - 1);
         // prepend the right number of leading zeros
         int i = 0;
         for (; i < (padTo * 2 - arr.length); i++) {
-            ret += "0";
+            ret.append('0');
             if ((i % 2) != 0) {
-                ret += ":";
+                ret.append(':');
             }
         }
         for (int j = 0; j < arr.length; j++) {
-            ret += arr[j];
+            ret.append(arr[j]);
             if ((((i + j) % 2) != 0) && (j < (arr.length - 1))) {
-                ret += ":";
+                ret.append(':');
             }
         }
-        return ret;
+        return ret.toString();
     }
 
     public static String toHexString(final long val) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 5ef1768..26d835d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -163,6 +163,7 @@
         handlers.putIfAbsent(type, handler);
     }
 
+    @Override
     public void unregisterHandler(String type) {
         handlers.remove(type);
     }
@@ -242,7 +243,7 @@
         }
     }
 
-    private class WriteTask implements Runnable {
+    private static class WriteTask implements Runnable {
 
         private final InternalMessage message;
         private final Channel channel;