initial working impl of batch operations
Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index ce11cea..a9eddd8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,10 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -18,8 +22,11 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -32,7 +39,9 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -131,6 +140,38 @@
}
@Override
+ public Future<CompletedBatchOperation> applyBatch(
+ FlowRuleBatchOperation batch) {
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ ArrayListMultimap.create();
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ final FlowRule f = fbe.getTarget();
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ batches.put(frp, fbe);
+ switch (fbe.getOperator()) {
+ case ADD:
+ store.storeFlowRule(f);
+ break;
+ case REMOVE:
+ store.deleteFlowRule(f);
+ break;
+ case MODIFY:
+ default:
+ log.error("Batch operation type {} unsupported.", fbe.getOperator());
+ }
+ }
+ for (FlowRuleProvider provider : batches.keySet()) {
+ FlowRuleBatchOperation b =
+ new FlowRuleBatchOperation(batches.get(provider));
+ Future<Void> future = provider.executeBatch(b);
+ futures.add(future);
+ }
+ return new FlowRuleBatchFuture(futures);
+ }
+
+ @Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
@@ -296,4 +337,63 @@
eventDispatcher.post(event);
}
}
+
+ private class FlowRuleBatchFuture
+ implements Future<CompletedBatchOperation> {
+
+ private final List<Future<Void>> futures;
+
+ public FlowRuleBatchFuture(List<Future<Void>> futures) {
+ this.futures = futures;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ boolean isDone = true;
+ for (Future<Void> future : futures) {
+ isDone &= future.isDone();
+ }
+ return isDone;
+ }
+
+ @Override
+ public CompletedBatchOperation get() throws InterruptedException,
+ ExecutionException {
+ // TODO Auto-generated method stub
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ return new CompletedBatchOperation();
+ }
+
+ @Override
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // TODO we should decrement the timeout
+ long start = System.nanoTime();
+ long end = start + unit.toNanos(timeout);
+ for (Future<Void> future : futures) {
+ long now = System.nanoTime();
+ long thisTimeout = end - now;
+ future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ }
+ return new CompletedBatchOperation();
+ }
+
+ }
+
+
}