Towards a distributed flow rule store
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 67e0867..ba37d22 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,8 +5,10 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -30,7 +32,9 @@
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -47,6 +51,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -104,11 +111,7 @@
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = flowRules[i];
- boolean local = store.storeFlowRule(f);
- if (local) {
- // TODO: aggregate all local rules and push down once?
- applyFlowRulesToProviders(f);
- }
+ store.storeFlowRule(f);
}
}
@@ -132,11 +135,7 @@
FlowRule f;
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
- boolean local = store.deleteFlowRule(f);
- if (local) {
- // TODO: aggregate all local rules and push down once?
- removeFlowRulesFromProviders(f);
- }
+ store.deleteFlowRule(f);
}
}
@@ -180,33 +179,21 @@
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
- Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
ArrayListMultimap.create();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
- final Device device = deviceService.getDevice(f.deviceId());
- final FlowRuleProvider frp = getProvider(device.providerId());
- batches.put(frp, fbe);
- switch (fbe.getOperator()) {
- case ADD:
- store.storeFlowRule(f);
- break;
- case REMOVE:
- store.deleteFlowRule(f);
- break;
- case MODIFY:
- default:
- log.error("Batch operation type {} unsupported.", fbe.getOperator());
- }
+ perDeviceBatches.put(f.deviceId(), fbe);
}
- for (FlowRuleProvider provider : batches.keySet()) {
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
FlowRuleBatchOperation b =
- new FlowRuleBatchOperation(batches.get(provider));
- Future<CompletedBatchOperation> future = provider.executeBatch(b);
+ new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
+ Future<CompletedBatchOperation> future = store.storeBatch(b);
futures.add(future);
}
- return new FlowRuleBatchFuture(futures, batches);
+ return new FlowRuleBatchFuture(futures, perDeviceBatches);
}
@Override
@@ -318,6 +305,7 @@
post(event);
}
} else {
+ log.info("Removing flow rules....");
removeFlowRules(flowEntry);
}
@@ -385,21 +373,47 @@
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+ // TODO: Right now we only dispatch events at individual flowEntry level.
+ // It may be more efficient for also dispatch events as a batch.
@Override
- public void notify(FlowRuleEvent event) {
+ public void notify(FlowRuleBatchEvent event) {
+ final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
- case RULE_ADD_REQUESTED:
- applyFlowRulesToProviders(event.subject());
- break;
- case RULE_REMOVE_REQUESTED:
- removeFlowRulesFromProviders(event.subject());
- break;
+ case BATCH_OPERATION_REQUESTED:
+// for (FlowEntry entry : request.toAdd()) {
+// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
+// }
+// for (FlowEntry entry : request.toRemove()) {
+// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
+// }
+// // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+//
+ FlowRuleBatchOperation batchOperation = request.asBatchOperation();
- case RULE_ADDED:
- case RULE_REMOVED:
- case RULE_UPDATED:
- // only dispatch events related to switch
- eventDispatcher.post(event);
+ FlowRuleProvider flowRuleProvider =
+ getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
+ final ListenableFuture<CompletedBatchOperation> result =
+ flowRuleProvider.executeBatch(batchOperation);
+ result.addListener(new Runnable() {
+ @Override
+ public void run() {
+ store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+ }
+ }, Executors.newCachedThreadPool());
+
+ break;
+ case BATCH_OPERATION_COMPLETED:
+ Set<FlowEntry> failedItems = event.result().failedItems();
+ for (FlowEntry entry : request.toAdd()) {
+ if (!failedItems.contains(entry)) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
+ }
+ }
+ for (FlowEntry entry : request.toRemove()) {
+ if (!failedItems.contains(entry)) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
+ }
+ }
break;
default:
break;
@@ -407,18 +421,15 @@
}
}
- private class FlowRuleBatchFuture
- implements Future<CompletedBatchOperation> {
+ private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
private final List<Future<CompletedBatchOperation>> futures;
- private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+ private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
-
-
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
- Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
+ Multimap<DeviceId, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
state = new AtomicReference<FlowRuleManager.BatchState>();
@@ -460,7 +471,7 @@
}
boolean success = true;
- List<FlowEntry> failed = Lists.newLinkedList();
+ Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
@@ -480,7 +491,7 @@
return overall;
}
boolean success = true;
- List<FlowEntry> failed = Lists.newLinkedList();
+ Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
@@ -494,7 +505,7 @@
return finalizeBatchOperation(success, failed);
}
- private boolean validateBatchOperation(List<FlowEntry> failed,
+ private boolean validateBatchOperation(Set<FlowEntry> failed,
CompletedBatchOperation completed) {
if (isCancelled()) {
@@ -516,7 +527,7 @@
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
- List<FlowEntry> failed) {
+ Set<FlowEntry> failed) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
@@ -539,11 +550,6 @@
store.storeFlowRule(fbe.getTarget());
}
}
-
}
}
-
-
-
-
}