DistributedFlowRuleStore
- replace pendingFuture Map with Cache
- remove the future after setting the Future value
Change-Id: I152dfd586350c472dde74a28579536b44761680a
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index dbd2688..30b9008 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -10,7 +10,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -58,10 +57,11 @@
import org.slf4j.Logger;
import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -99,9 +99,16 @@
private final AtomicInteger localBatchIdGen = new AtomicInteger();
+ // TODO: make this configurable
+ private int pendingFutureTimeoutMinutes = 5;
- // FIXME switch to expiraing map/Cache?
- private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+ private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
+ // TODO Explicitly fail the future if expired?
+ //.removalListener(listener)
+ .build();
+
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
@@ -405,10 +412,12 @@
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
+ final Integer batchId = event.subject().batchId();
SettableFuture<CompletedBatchOperation> future
- = pendingFutures.get(event.subject().batchId());
+ = pendingFutures.getIfPresent(batchId);
if (future != null) {
future.set(event.result());
+ pendingFutures.invalidate(batchId);
}
notifyDelegate(event);
}