FlowRule subsystem bugfixes

- RULE_ADDED will be posted when the Flow was confirmed by stats,
  even if they were installed as a batch
- Properly handle batch in Simple store

Change-Id: I0a0e15b29ff9c0d56d5a646e0751511d73c8f552
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 49b0c71..b7d26fb 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -16,8 +16,12 @@
 package org.onlab.onos.store.trivial.impl;
 
 import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.FluentIterable;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,13 +47,15 @@
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
@@ -72,6 +78,18 @@
     private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>
             flowEntries = new ConcurrentHashMap<>();
 
+    private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+    // TODO: make this configurable
+    private int pendingFutureTimeoutMinutes = 5;
+
+    private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
+            CacheBuilder.newBuilder()
+                .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
+                // TODO Explicitly fail the future if expired?
+                //.removalListener(listener)
+                .build();
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -173,10 +191,6 @@
             }
             // new flow rule added
             existing.add(f);
-            notifyDelegate(FlowRuleBatchEvent.requested(
-                    new FlowRuleBatchRequest(1, /* FIXME generate something */
-                                             Arrays.<FlowEntry>asList(f),
-                                             Collections.<FlowEntry>emptyList())));
         }
     }
 
@@ -190,11 +204,6 @@
                 if (entry.equals(rule)) {
                     synchronized (entry) {
                         entry.setState(FlowEntryState.PENDING_REMOVE);
-                        // TODO: Should we notify only if it's "remote" event?
-                        notifyDelegate(FlowRuleBatchEvent.requested(
-                                new FlowRuleBatchRequest(1, /* FIXME generate something */
-                                                         Collections.<FlowEntry>emptyList(),
-                                                         Arrays.<FlowEntry>asList(entry))));
                     }
                 }
             }
@@ -251,20 +260,47 @@
     @Override
     public Future<CompletedBatchOperation> storeBatch(
             FlowRuleBatchOperation batchOperation) {
+        List<FlowRule> toAdd = new ArrayList<>();
+        List<FlowRule> toRemove = new ArrayList<>();
         for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+            final FlowRule flowRule = entry.getTarget();
             if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
-                storeFlowRule(entry.getTarget());
+                if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
+                    storeFlowRule(flowRule);
+                    toAdd.add(flowRule);
+                }
             } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
-                deleteFlowRule(entry.getTarget());
+                if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
+                    deleteFlowRule(flowRule);
+                    toRemove.add(flowRule);
+                }
             } else {
                 throw new UnsupportedOperationException("Unsupported operation type");
             }
         }
-        return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+
+        if (toAdd.isEmpty() && toRemove.isEmpty()) {
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+        }
+
+        SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+        final int batchId = localBatchIdGen.incrementAndGet();
+
+        pendingFutures.put(batchId, r);
+        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+
+        return r;
     }
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
+        final Integer batchId = event.subject().batchId();
+        SettableFuture<CompletedBatchOperation> future
+            = pendingFutures.getIfPresent(batchId);
+        if (future != null) {
+            future.set(event.result());
+            pendingFutures.invalidate(batchId);
+        }
         notifyDelegate(event);
     }
 }