DistributedFlowRuleStore: remote batch support

Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3ef9fc8..e91bb89 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -2,12 +2,14 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.namedThreads;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -74,6 +76,8 @@
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
 
+    private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
 
@@ -92,6 +96,8 @@
 
     @Deactivate
     public void deactivate() {
+        futureListeners.shutdownNow();
+
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(FlowRuleEvent.class);
         log.info("Stopped");
@@ -398,9 +404,9 @@
                 result.addListener(new Runnable() {
                     @Override
                     public void run() {
-                        store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+                        store.batchOperationComplete(FlowRuleBatchEvent.completed(request, Futures.getUnchecked(result)));
                     }
-                }, Executors.newCachedThreadPool());
+                }, futureListeners);
 
                 break;
             case BATCH_OPERATION_COMPLETED: