DistributedFlowRuleStore: remote batch support
Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3ef9fc8..e91bb89 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -2,12 +2,14 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.namedThreads;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -74,6 +76,8 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
+ private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -92,6 +96,8 @@
@Deactivate
public void deactivate() {
+ futureListeners.shutdownNow();
+
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
@@ -398,9 +404,9 @@
result.addListener(new Runnable() {
@Override
public void run() {
- store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(request, Futures.getUnchecked(result)));
}
- }, Executors.newCachedThreadPool());
+ }, futureListeners);
break;
case BATCH_OPERATION_COMPLETED: