Fixing flow rule batches
Problem should now be fixed. Hashing on enums last is a bad
idea because the enum value could be 0.
Change-Id: Ib29e90b393b5285be2807729b52e69b121340f09
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index e996dfc..7e5f049 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -15,22 +15,12 @@
*/
package org.onlab.onos.net.flow.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onlab.util.Tools.namedThreads;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -64,14 +54,21 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -92,8 +89,7 @@
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
- private final ExecutorService futureListeners =
- Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
+ private ExecutorService futureService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
@@ -106,6 +102,7 @@
@Activate
public void activate() {
+ futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
@@ -113,7 +110,7 @@
@Deactivate
public void deactivate() {
- futureListeners.shutdownNow();
+ futureService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
@@ -364,6 +361,9 @@
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+
+ private static final int TIMEOUT = 5000; // ms
+
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@Override
@@ -384,15 +384,21 @@
FlowRuleProvider flowRuleProvider =
getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
- final ListenableFuture<CompletedBatchOperation> result =
+ final Future<CompletedBatchOperation> result =
flowRuleProvider.executeBatch(batchOperation);
- result.addListener(new Runnable() {
+ futureService.submit(new Runnable() {
@Override
public void run() {
- store.batchOperationComplete(FlowRuleBatchEvent.completed(request,
- Futures.getUnchecked(result)));
+ CompletedBatchOperation res = null;
+ try {
+ res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ log.warn("Something went wrong with the batch operation {}",
+ request.batchId());
+ }
+ store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
}
- }, futureListeners);
+ });
break;
case BATCH_OPERATION_COMPLETED: