FlowRule : handle Future failure and timeouts.
Change-Id: Ie945b7ee936ae48ec3205592c309baebe8538ce0
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f65da3f..c94b6c0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -85,6 +85,8 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -132,8 +134,7 @@
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
- // TODO Explicitly fail the future if expired?
- //.removalListener(listener)
+ .removalListener(new TimeoutFuture())
.build();
// Cache of SMaps used for backup data. each SMap contain device flow table
@@ -541,6 +542,17 @@
log.debug("removedFromPrimary {}", removed);
}
+ private static final class TimeoutFuture
+ implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
+ @Override
+ public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
+ // wrapping in ExecutionException to support Future.get
+ notification.getValue()
+ .setException(new ExecutionException("Timed out",
+ new TimeoutException()));
+ }
+ }
+
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
@@ -580,7 +592,18 @@
@Override
public void run() {
- CompletedBatchOperation result = Futures.getUnchecked(f);
+ CompletedBatchOperation result;
+ try {
+ result = f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Batch operation failed", e);
+ // create everything failed response
+ Set<FlowRule> failures = new HashSet<>(operation.size());
+ for (FlowRuleBatchEntry op : operation.getOperations()) {
+ failures.add(op.getTarget());
+ }
+ result = new CompletedBatchOperation(false, failures);
+ }
try {
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index b7d26fb..622e2de 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -18,6 +18,8 @@
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
@@ -53,8 +55,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@@ -86,8 +90,7 @@
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
- // TODO Explicitly fail the future if expired?
- //.removalListener(listener)
+ .removalListener(new TimeoutFuture())
.build();
@Activate
@@ -303,4 +306,15 @@
}
notifyDelegate(event);
}
+
+ private static final class TimeoutFuture
+ implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
+ @Override
+ public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
+ // wrapping in ExecutionException to support Future.get
+ notification.getValue()
+ .setException(new ExecutionException("Timed out",
+ new TimeoutException()));
+ }
+ }
}