clean batch operations
Change-Id: I7187de40bb5276d6ae9e9831e5d47d36e16560ad
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index bde752e..e9889cd 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -1,6 +1,29 @@
package org.onlab.onos.net.flow;
-public class CompletedBatchOperation {
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
+
+
+ private final boolean success;
+ private final List<FlowEntry> failures;
+
+ public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
+ this.success = success;
+ this.failures = ImmutableList.copyOf(failures);
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+
+ @Override
+ public List<FlowEntry> failedItems() {
+ return failures;
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
index 5a0f55b..d4657d2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
@@ -17,6 +17,10 @@
private long lastSeen = -1;
+ private final int errType;
+
+ private final int errCode;
+
public DefaultFlowEntry(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowEntryState state,
@@ -27,6 +31,8 @@
this.life = life;
this.packets = packets;
this.bytes = bytes;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
@@ -37,6 +43,8 @@
this.life = life;
this.packets = packets;
this.bytes = bytes;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
@@ -46,9 +54,18 @@
this.life = 0;
this.packets = 0;
this.bytes = 0;
+ this.errCode = -1;
+ this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
+ public DefaultFlowEntry(FlowRule rule, int errType, int errCode) {
+ super(rule);
+ this.state = FlowEntryState.FAILED;
+ this.errType = errType;
+ this.errCode = errCode;
+ }
+
@Override
public long life() {
return life;
@@ -100,6 +117,16 @@
}
@Override
+ public int errType() {
+ return this.errType;
+ }
+
+ @Override
+ public int errCode() {
+ return this.errCode;
+ }
+
+ @Override
public String toString() {
return toStringHelper(this)
.add("rule", super.toString())
@@ -108,4 +135,6 @@
}
+
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
index 5b5f89b..882c9df 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
@@ -29,7 +29,12 @@
/**
* Flow has been removed from flow table and can be purged.
*/
- REMOVED
+ REMOVED,
+
+ /**
+ * Indicates that the installation of this flow has failed.
+ */
+ FAILED
}
/**
@@ -95,4 +100,16 @@
*/
void setBytes(long bytes);
+ /**
+ * Indicates the error type.
+ * @return an integer value of the error
+ */
+ int errType();
+
+ /**
+ * Indicates the error code.
+ * @return an integer value of the error
+ */
+ int errCode();
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 68762ac..3592e39 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -37,6 +37,12 @@
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
- Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+ /**
+ * Installs a batch of flow rules. Each flowrule is associated to an
+ * operation which results in either addition, removal or modification.
+ * @param batch a batch of flow rules
+ * @return a future indicating the status of this execution
+ */
+ Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a9eddd8..a897cbb 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,10 +5,12 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -26,6 +28,7 @@
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
@@ -52,6 +55,8 @@
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
+ enum BatchState { STARTED, FINISHED, CANCELLED };
+
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
@@ -144,7 +149,7 @@
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
- List<Future<Void>> futures = Lists.newArrayList();
+ List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
@@ -165,10 +170,10 @@
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
- Future<Void> future = provider.executeBatch(b);
+ Future<CompletedBatchOperation> future = provider.executeBatch(b);
futures.add(future);
}
- return new FlowRuleBatchFuture(futures);
+ return new FlowRuleBatchFuture(futures, batches);
}
@Override
@@ -341,59 +346,140 @@
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
- private final List<Future<Void>> futures;
+ private final List<Future<CompletedBatchOperation>> futures;
+ private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+ private final AtomicReference<BatchState> state;
+ private CompletedBatchOperation overall;
- public FlowRuleBatchFuture(List<Future<Void>> futures) {
+
+
+ public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
this.futures = futures;
+ this.batches = batches;
+ state = new AtomicReference<FlowRuleManager.BatchState>();
+ state.set(BatchState.STARTED);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO Auto-generated method stub
- return false;
+ if (state.get() == BatchState.FINISHED) {
+ return false;
+ }
+ if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
+ return false;
+ }
+ cleanUpBatch();
+ for (Future<CompletedBatchOperation> f : futures) {
+ f.cancel(mayInterruptIfRunning);
+ }
+ return true;
}
@Override
public boolean isCancelled() {
- // TODO Auto-generated method stub
- return false;
+ return state.get() == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
- boolean isDone = true;
- for (Future<Void> future : futures) {
- isDone &= future.isDone();
- }
- return isDone;
+ return state.get() == BatchState.FINISHED;
}
+
@Override
public CompletedBatchOperation get() throws InterruptedException,
- ExecutionException {
- // TODO Auto-generated method stub
- for (Future<Void> future : futures) {
- future.get();
+ ExecutionException {
+
+ if (isDone()) {
+ return overall;
}
- return new CompletedBatchOperation();
+
+ boolean success = true;
+ List<FlowEntry> failed = Lists.newLinkedList();
+ CompletedBatchOperation completed;
+ for (Future<CompletedBatchOperation> future : futures) {
+ completed = future.get();
+ success = validateBatchOperation(failed, completed, future);
+ }
+
+ return finalizeBatchOperation(success, failed);
+
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- // TODO we should decrement the timeout
+
+ if (isDone()) {
+ return overall;
+ }
+ boolean success = true;
+ List<FlowEntry> failed = Lists.newLinkedList();
+ CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
- for (Future<Void> future : futures) {
+
+ for (Future<CompletedBatchOperation> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
- future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ success = validateBatchOperation(failed, completed, future);
}
- return new CompletedBatchOperation();
+ return finalizeBatchOperation(success, failed);
}
+ private boolean validateBatchOperation(List<FlowEntry> failed,
+ CompletedBatchOperation completed,
+ Future<CompletedBatchOperation> future) {
+
+ if (isCancelled()) {
+ throw new CancellationException();
+ }
+ if (!completed.isSuccess()) {
+ failed.addAll(completed.failedItems());
+ cleanUpBatch();
+ cancelAllSubBatches();
+ return false;
+ }
+ return true;
+ }
+
+ private void cancelAllSubBatches() {
+ for (Future<CompletedBatchOperation> f : futures) {
+ f.cancel(true);
+ }
+ }
+
+ private CompletedBatchOperation finalizeBatchOperation(boolean success,
+ List<FlowEntry> failed) {
+ synchronized (overall) {
+ if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
+ if (state.get() == BatchState.FINISHED) {
+ return overall;
+ }
+ throw new CancellationException();
+ }
+ overall = new CompletedBatchOperation(success, failed);
+ return overall;
+ }
+ }
+
+ private void cleanUpBatch() {
+ for (FlowRuleBatchEntry fbe : batches.values()) {
+ if (fbe.getOperator() == FlowRuleOperation.ADD ||
+ fbe.getOperator() == FlowRuleOperation.MODIFY) {
+ store.deleteFlowRule(fbe.getTarget());
+ } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+ store.storeFlowRule(fbe.getTarget());
+ }
+ }
+
+ }
}
+
+
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 86f3ddc..5b363da 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -28,6 +28,7 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
@@ -408,7 +409,7 @@
}
@Override
- public Future<Void> executeBatch(
+ public Future<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;