FlowRule : handle Future failure and timeouts.

Change-Id: Ie945b7ee936ae48ec3205592c309baebe8538ce0
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 7e5f049..2e53252 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -54,6 +55,7 @@
 import org.onlab.onos.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -389,14 +391,21 @@
                 futureService.submit(new Runnable() {
                     @Override
                     public void run() {
-                        CompletedBatchOperation res = null;
+                        CompletedBatchOperation res;
                         try {
                             res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
+                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
                         } catch (TimeoutException | InterruptedException | ExecutionException e) {
                             log.warn("Something went wrong with the batch operation {}",
-                                     request.batchId());
+                                     request.batchId(), e);
+
+                            Set<FlowRule> failures = new HashSet<>(batchOperation.size());
+                            for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
+                                failures.add(op.getTarget());
+                            }
+                            res = new CompletedBatchOperation(false, failures);
+                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
                         }
-                        store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
                     }
                 });
                 break;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f65da3f..c94b6c0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -85,6 +85,8 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -132,8 +134,7 @@
     private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
             CacheBuilder.newBuilder()
                 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
-                // TODO Explicitly fail the future if expired?
-                //.removalListener(listener)
+                .removalListener(new TimeoutFuture())
                 .build();
 
     // Cache of SMaps used for backup data.  each SMap contain device flow table
@@ -541,6 +542,17 @@
         log.debug("removedFromPrimary {}", removed);
     }
 
+    private static final class TimeoutFuture
+        implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
+        @Override
+        public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
+            // wrapping in ExecutionException to support Future.get
+            notification.getValue()
+                .setException(new ExecutionException("Timed out",
+                                                     new TimeoutException()));
+        }
+    }
+
     private final class OnStoreBatch implements ClusterMessageHandler {
         private final NodeId local;
 
@@ -580,7 +592,18 @@
 
                 @Override
                 public void run() {
-                     CompletedBatchOperation result = Futures.getUnchecked(f);
+                    CompletedBatchOperation result;
+                    try {
+                        result = f.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.error("Batch operation failed", e);
+                        // create everything failed response
+                        Set<FlowRule> failures = new HashSet<>(operation.size());
+                        for (FlowRuleBatchEntry op : operation.getOperations()) {
+                            failures.add(op.getTarget());
+                        }
+                        result = new CompletedBatchOperation(false, failures);
+                    }
                     try {
                         message.respond(SERIALIZER.encode(result));
                     } catch (IOException e) {
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index b7d26fb..622e2de 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -18,6 +18,8 @@
 import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.FluentIterable;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -53,8 +55,10 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@@ -86,8 +90,7 @@
     private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
             CacheBuilder.newBuilder()
                 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
-                // TODO Explicitly fail the future if expired?
-                //.removalListener(listener)
+                .removalListener(new TimeoutFuture())
                 .build();
 
     @Activate
@@ -303,4 +306,15 @@
         }
         notifyDelegate(event);
     }
+
+    private static final class TimeoutFuture
+        implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
+        @Override
+        public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
+            // wrapping in ExecutionException to support Future.get
+            notification.getValue()
+                .setException(new ExecutionException("Timed out",
+                                                     new TimeoutException()));
+        }
+    }
 }