DistributedFlowRuleStore

- replace pendingFuture Map with Cache
- remove the future after setting the Future value

Change-Id: I152dfd586350c472dde74a28579536b44761680a
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index dbd2688..30b9008 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -10,7 +10,6 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -58,10 +57,11 @@
 import org.slf4j.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -99,9 +99,16 @@
 
     private final AtomicInteger localBatchIdGen = new AtomicInteger();
 
+    // TODO: make this configurable
+    private int pendingFutureTimeoutMinutes = 5;
 
-    // FIXME switch to expiraing map/Cache?
-    private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+    private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
+            CacheBuilder.newBuilder()
+                .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
+                // TODO Explicitly fail the future if expired?
+                //.removalListener(listener)
+                .build();
+
 
     private final ExecutorService futureListeners =
             Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
@@ -405,10 +412,12 @@
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
+        final Integer batchId = event.subject().batchId();
         SettableFuture<CompletedBatchOperation> future
-            = pendingFutures.get(event.subject().batchId());
+            = pendingFutures.getIfPresent(batchId);
         if (future != null) {
             future.set(event.result());
+            pendingFutures.invalidate(batchId);
         }
         notifyDelegate(event);
     }