Updating Intent Manager to deal with failures.

Added ids to Flow batch futures.
Adding some basic unit tests for IntentManger
Adding failedIds to the completedOperation in FlowRuleManager

Change-Id: I7645cead193299f70d319d254cd1e82d96909e7b
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index d6d06d0..6ab5e12 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -340,7 +340,8 @@
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
 
         if (operation.getOperations().isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+            return Futures.immediateFuture(new CompletedBatchOperation(true,
+                                                                       Collections.<FlowRule>emptySet()));
         }
 
         DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
@@ -379,8 +380,8 @@
     private ListenableFuture<CompletedBatchOperation>
                         storeBatchInternal(FlowRuleBatchOperation operation) {
 
-        final List<StoredFlowEntry> toRemove = new ArrayList<>();
-        final List<StoredFlowEntry> toAdd = new ArrayList<>();
+        final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
+        final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
         DeviceId did = null;
 
 
@@ -396,14 +397,14 @@
                     StoredFlowEntry entry = getFlowEntryInternal(flowRule);
                     if (entry != null) {
                         entry.setState(FlowEntryState.PENDING_REMOVE);
-                        toRemove.add(entry);
+                        toRemove.add(batchEntry);
                     }
                 } else if (op.equals(FlowRuleOperation.ADD)) {
                     StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
                     DeviceId deviceId = flowRule.deviceId();
                     if (!flowEntries.containsEntry(deviceId, flowEntry)) {
                         flowEntries.put(deviceId, flowEntry);
-                        toAdd.add(flowEntry);
+                        toAdd.add(batchEntry);
                     }
                 }
             }
@@ -427,8 +428,8 @@
     }
 
     private void updateBackup(final DeviceId deviceId,
-                              final List<StoredFlowEntry> toAdd,
-                              final List<? extends FlowRule> list) {
+                              final List<FlowRuleBatchEntry> toAdd,
+                              final List<FlowRuleBatchEntry> list) {
 
         Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
 
@@ -442,8 +443,9 @@
         }
     }
 
-    private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
-        updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
+    private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
+
+        updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
     }
 
     @Override
@@ -477,8 +479,9 @@
                 stored.setPackets(rule.packets());
                 if (stored.state() == FlowEntryState.PENDING_ADD) {
                     stored.setState(FlowEntryState.ADDED);
-                    // update backup.
-                    updateBackup(did, Arrays.asList(stored));
+                    FlowRuleBatchEntry entry =
+                            new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+                    updateBackup(did, Arrays.asList(entry));
                     return new FlowRuleEvent(Type.RULE_ADDED, rule);
                 }
                 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
@@ -515,7 +518,9 @@
         try {
             // This is where one could mark a rule as removed and still keep it in the store.
             final boolean removed = flowEntries.remove(deviceId, rule);
-            updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+            FlowRuleBatchEntry entry =
+                    new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+            updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
             if (removed) {
                 return new FlowRuleEvent(RULE_REMOVED, rule);
             } else {
@@ -687,15 +692,17 @@
     }
 
     // Task to update FlowEntries in backup HZ store
+    // TODO: Should be refactored to contain only one list and not
+    //      toAdd and toRemove
     private final class UpdateBackup implements Runnable {
 
         private final DeviceId deviceId;
-        private final List<StoredFlowEntry> toAdd;
-        private final List<? extends FlowRule> toRemove;
+        private final List<FlowRuleBatchEntry> toAdd;
+        private final List<FlowRuleBatchEntry> toRemove;
 
         public UpdateBackup(DeviceId deviceId,
-                             List<StoredFlowEntry> toAdd,
-                             List<? extends FlowRule> list) {
+                             List<FlowRuleBatchEntry> toAdd,
+                             List<FlowRuleBatchEntry> list) {
             this.deviceId = checkNotNull(deviceId);
             this.toAdd = checkNotNull(toAdd);
             this.toRemove = checkNotNull(list);
@@ -707,7 +714,8 @@
                 log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
                 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
                 // Following should be rewritten using async APIs
-                for (StoredFlowEntry entry : toAdd) {
+                for (FlowRuleBatchEntry bEntry : toAdd) {
+                    final FlowRule entry = bEntry.getTarget();
                     final FlowId id = entry.id();
                     ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
                     List<StoredFlowEntry> list = new ArrayList<>();
@@ -715,8 +723,8 @@
                         list.addAll(original);
                     }
 
-                    list.remove(entry);
-                    list.add(entry);
+                    list.remove(bEntry.getTarget());
+                    list.add((StoredFlowEntry) entry);
 
                     ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
                     boolean success;
@@ -730,7 +738,8 @@
                         log.error("Updating backup failed.");
                     }
                 }
-                for (FlowRule entry : toRemove) {
+                for (FlowRuleBatchEntry bEntry : toRemove) {
+                    final FlowRule entry = bEntry.getTarget();
                     final FlowId id = entry.id();
                     ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
                     List<StoredFlowEntry> list = new ArrayList<>();
@@ -738,7 +747,7 @@
                         list.addAll(original);
                     }
 
-                    list.remove(entry);
+                    list.remove(bEntry.getTarget());
 
                     ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
                     boolean success;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
index 4c146d1..68cda00 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentBatchQueue.java
@@ -25,7 +25,6 @@
 import org.onlab.onos.net.intent.IntentOperations;
 import org.slf4j.Logger;
 
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
@@ -82,10 +81,17 @@
     }
 
     @Override
-    public Set<IntentOperations> getIntentOperations() {
-        Set<IntentOperations> set = Sets.newHashSet(currentBatches);
-        set.addAll((Collection) pendingBatches);
-        return set;
+    public Set<IntentOperations> getPendingOperations() {
+        synchronized (this) {
+            return Sets.newHashSet(pendingBatches);
+        }
+    }
+
+    @Override
+    public Set<IntentOperations> getCurrentOperations() {
+        synchronized (this) {
+            return Sets.newHashSet(currentBatches);
+        }
     }
 
     @Override
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 622e2de..4cd31d5 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -263,19 +263,19 @@
     @Override
     public Future<CompletedBatchOperation> storeBatch(
             FlowRuleBatchOperation batchOperation) {
-        List<FlowRule> toAdd = new ArrayList<>();
-        List<FlowRule> toRemove = new ArrayList<>();
+        List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
+        List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
         for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
             final FlowRule flowRule = entry.getTarget();
             if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
                 if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
                     storeFlowRule(flowRule);
-                    toAdd.add(flowRule);
+                    toAdd.add(entry);
                 }
             } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
                 if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
                     deleteFlowRule(flowRule);
-                    toRemove.add(flowRule);
+                    toRemove.add(entry);
                 }
             } else {
                 throw new UnsupportedOperationException("Unsupported operation type");
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
index b8d22bf..f92eca0 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
@@ -15,7 +15,7 @@
  */
 package org.onlab.onos.store.trivial.impl;
 
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -25,7 +25,8 @@
 import org.onlab.onos.net.intent.IntentOperations;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,7 +38,8 @@
 public class SimpleIntentBatchQueue implements IntentBatchService {
 
     private final Logger log = getLogger(getClass());
-    private final Set<IntentOperations> pendingBatches = new HashSet<>();
+    private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
+    private final Set<IntentOperations> currentBatches = Sets.newHashSet();
     private IntentBatchDelegate delegate;
 
     @Activate
@@ -53,18 +55,42 @@
     @Override
     public void addIntentOperations(IntentOperations operations) {
         checkState(delegate != null, "No delegate set");
-        pendingBatches.add(operations);
-        delegate.execute(operations);
+        synchronized (this) {
+            pendingBatches.add(operations);
+            if (currentBatches.isEmpty()) {
+                IntentOperations work = pendingBatches.poll();
+                currentBatches.add(work);
+                delegate.execute(work);
+            }
+        }
     }
 
     @Override
     public void removeIntentOperations(IntentOperations operations) {
-        pendingBatches.remove(operations);
+        // we allow at most one outstanding batch at a time
+        synchronized (this) {
+            checkState(currentBatches.remove(operations), "Operations not found in current ops.");
+            checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
+            IntentOperations work = pendingBatches.poll();
+            if (work != null) {
+                currentBatches.add(work);
+                delegate.execute(work);
+            }
+        }
     }
 
     @Override
-    public Set<IntentOperations> getIntentOperations() {
-        return ImmutableSet.copyOf(pendingBatches);
+    public Set<IntentOperations> getPendingOperations() {
+        synchronized (this) {
+            return Sets.newHashSet(pendingBatches);
+        }
+    }
+
+    @Override
+    public Set<IntentOperations> getCurrentOperations() {
+        synchronized (this) {
+            return Sets.newHashSet(currentBatches);
+        }
     }
 
     @Override