FlowRule subsystem bugfixes
- RULE_ADDED will be posted when the Flow was confirmed by stats,
even if they were installed as a batch
- Properly handle batch in Simple store
Change-Id: I0a0e15b29ff9c0d56d5a646e0751511d73c8f552
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
index d5c762d..1dbf8bd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
@@ -27,11 +27,14 @@
*/
public enum Type {
+ // Request has been forwarded to MASTER Node
/**
* Signifies that a batch operation has been initiated.
*/
BATCH_OPERATION_REQUESTED,
+ // MASTER Node has pushed the batch down to the Device
+ // (e.g., Received barrier reply)
/**
* Signifies that a batch operation has completed.
*/
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index 4a2bcf9..f75c663 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -25,29 +25,29 @@
public class FlowRuleBatchRequest {
private final int batchId;
- private final List<FlowEntry> toAdd;
- private final List<FlowEntry> toRemove;
+ private final List<FlowRule> toAdd;
+ private final List<FlowRule> toRemove;
- public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) {
+ public FlowRuleBatchRequest(int batchId, List<? extends FlowRule> toAdd, List<? extends FlowRule> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
- public List<FlowEntry> toAdd() {
+ public List<FlowRule> toAdd() {
return toAdd;
}
- public List<FlowEntry> toRemove() {
+ public List<FlowRule> toRemove() {
return toRemove;
}
public FlowRuleBatchOperation asBatchOperation() {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
- for (FlowEntry e : toAdd) {
+ for (FlowRule e : toAdd) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
}
- for (FlowEntry e : toRemove) {
+ for (FlowRule e : toRemove) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
}
return new FlowRuleBatchOperation(entries);
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 4fe9022..bd7fb94 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -371,10 +371,11 @@
final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
- for (FlowEntry entry : request.toAdd()) {
+ // Request has been forwarded to MASTER Node, and was
+ for (FlowRule entry : request.toAdd()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
}
- for (FlowEntry entry : request.toRemove()) {
+ for (FlowRule entry : request.toRemove()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
}
// FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
@@ -392,21 +393,15 @@
Futures.getUnchecked(result)));
}
}, futureListeners);
+ break;
- break;
case BATCH_OPERATION_COMPLETED:
- Set<FlowRule> failedItems = event.result().failedItems();
- for (FlowEntry entry : request.toAdd()) {
- if (!failedItems.contains(entry)) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
- }
- }
- for (FlowEntry entry : request.toRemove()) {
- if (!failedItems.contains(entry)) {
- eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
- }
- }
+ // MASTER Node has pushed the batch down to the Device
+
+ // Note: RULE_ADDED will be posted
+ // when Flow was actually confirmed by stats reply.
break;
+
default:
break;
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index b986d6d..f67d992 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -148,7 +148,7 @@
int i = 0;
System.err.println("events :" + listener.events);
for (FlowRuleEvent e : listener.events) {
- assertTrue("unexpected event", e.type().equals(events[i]));
+ assertEquals("unexpected event", events[i], e.type());
i++;
}
@@ -178,15 +178,13 @@
RULE_ADDED, RULE_ADDED);
addFlowRule(1);
+ System.err.println("events :" + listener.events);
assertEquals("should still be 2 rules", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1));
validateEvents(RULE_UPDATED);
}
-
- // TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
- //backing store is sensitive to the order of additions/removals
private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
Iterable<FlowEntry> rules = service.getFlowEntries(DID);
@@ -539,17 +537,17 @@
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- return true;
+ return false;
}
@Override
public boolean isCancelled() {
- return true;
+ return false;
}
@Override
public boolean isDone() {
- return false;
+ return true;
}
@Override
@@ -562,12 +560,14 @@
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException,
ExecutionException, TimeoutException {
- return null;
+ return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
}
@Override
public void addListener(Runnable task, Executor executor) {
- // TODO: add stuff.
+ if (isDone()) {
+ executor.execute(task);
+ }
}
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 49b0c71..b7d26fb 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -16,8 +16,12 @@
package org.onlab.onos.store.trivial.impl;
import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,13 +47,15 @@
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
@@ -72,6 +78,18 @@
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
+ private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+ // TODO: make this configurable
+ private int pendingFutureTimeoutMinutes = 5;
+
+ private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
+ // TODO Explicitly fail the future if expired?
+ //.removalListener(listener)
+ .build();
+
@Activate
public void activate() {
log.info("Started");
@@ -173,10 +191,6 @@
}
// new flow rule added
existing.add(f);
- notifyDelegate(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(1, /* FIXME generate something */
- Arrays.<FlowEntry>asList(f),
- Collections.<FlowEntry>emptyList())));
}
}
@@ -190,11 +204,6 @@
if (entry.equals(rule)) {
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
- // TODO: Should we notify only if it's "remote" event?
- notifyDelegate(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(1, /* FIXME generate something */
- Collections.<FlowEntry>emptyList(),
- Arrays.<FlowEntry>asList(entry))));
}
}
}
@@ -251,20 +260,47 @@
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
+ List<FlowRule> toAdd = new ArrayList<>();
+ List<FlowRule> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+ final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
- storeFlowRule(entry.getTarget());
+ if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
+ storeFlowRule(flowRule);
+ toAdd.add(flowRule);
+ }
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
- deleteFlowRule(entry.getTarget());
+ if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
+ deleteFlowRule(flowRule);
+ toRemove.add(flowRule);
+ }
} else {
throw new UnsupportedOperationException("Unsupported operation type");
}
}
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+
+ if (toAdd.isEmpty() && toRemove.isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+ }
+
+ SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+ final int batchId = localBatchIdGen.incrementAndGet();
+
+ pendingFutures.put(batchId, r);
+ notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+
+ return r;
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
+ final Integer batchId = event.subject().batchId();
+ SettableFuture<CompletedBatchOperation> future
+ = pendingFutures.getIfPresent(batchId);
+ if (future != null) {
+ future.set(event.result());
+ pendingFutures.invalidate(batchId);
+ }
notifyDelegate(event);
}
}