initial working impl of batch operations
Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
new file mode 100644
index 0000000..bde752e
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -0,0 +1,6 @@
+package org.onlab.onos.net.flow;
+
+public class CompletedBatchOperation {
+
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 8b30c74..410aed4 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -2,12 +2,13 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.intent.BatchOperationTarget;
/**
* Represents a generalized match & action pair to be applied to
* an infrastucture device.
*/
-public interface FlowRule {
+public interface FlowRule extends BatchOperationTarget {
static final int MAX_TIMEOUT = 60;
static final int MIN_PRIORITY = 0;
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
new file mode 100644
index 0000000..d5a1472
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEntry.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.net.flow;
+
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.intent.BatchOperationEntry;
+
+
+public class FlowRuleBatchEntry
+ extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
+
+ public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
+ super(operator, target);
+ }
+
+ public enum FlowRuleOperation {
+ ADD,
+ REMOVE,
+ MODIFY
+ }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
new file mode 100644
index 0000000..74ef165
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchOperation.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.net.flow;
+
+import java.util.Collection;
+
+import org.onlab.onos.net.intent.BatchOperation;
+
+public class FlowRuleBatchOperation
+ extends BatchOperation<FlowRuleBatchEntry> {
+
+ public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+ super(operations);
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index c4e2f92..68762ac 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -1,6 +1,9 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
/**
@@ -34,4 +37,6 @@
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
+ Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
index 8600c54..6d04810 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
@@ -1,5 +1,7 @@
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
@@ -66,7 +68,12 @@
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
- //Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
+ /**
+ * Applies a batch operation of FlowRules.
+ *
+ * @return future indicating the state of the batch operation
+ */
+ Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Adds the specified flow rule listener.
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
index 5d0cbb8..72a9847 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperation.java
@@ -1,12 +1,13 @@
package org.onlab.onos.net.intent;
//TODO is this the right package?
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* A list of BatchOperationEntry.
*
@@ -15,7 +16,7 @@
*/
public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
- private List<T> ops;
+ private final List<T> ops;
/**
* Creates new {@link BatchOperation} object.
@@ -30,7 +31,7 @@
*
* @param batchOperations the list of batch operation entries.
*/
- public BatchOperation(List<T> batchOperations) {
+ public BatchOperation(Collection<T> batchOperations) {
ops = new LinkedList<>(checkNotNull(batchOperations));
}
@@ -61,6 +62,10 @@
/**
* Adds an operation.
+ * FIXME: Brian promises that the Intent Framework
+ * will not modify the batch operation after it has submitted it.
+ * Ali would prefer immutablity, but trusts brian for better or
+ * for worse.
*
* @param entry the operation to be added
* @return this object if succeeded, null otherwise
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
index b5dfa88..4e57d33 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/BatchOperationEntry.java
@@ -15,14 +15,7 @@
private final T operator;
private final U target;
- /**
- * Default constructor for serializer.
- */
- @Deprecated
- protected BatchOperationEntry() {
- this.operator = null;
- this.target = null;
- }
+
/**
* Constructs new instance for the entry of the BatchOperation.
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index ce11cea..a9eddd8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,10 @@
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -18,8 +22,11 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -32,7 +39,9 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -131,6 +140,38 @@
}
@Override
+ public Future<CompletedBatchOperation> applyBatch(
+ FlowRuleBatchOperation batch) {
+ Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ ArrayListMultimap.create();
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ final FlowRule f = fbe.getTarget();
+ final Device device = deviceService.getDevice(f.deviceId());
+ final FlowRuleProvider frp = getProvider(device.providerId());
+ batches.put(frp, fbe);
+ switch (fbe.getOperator()) {
+ case ADD:
+ store.storeFlowRule(f);
+ break;
+ case REMOVE:
+ store.deleteFlowRule(f);
+ break;
+ case MODIFY:
+ default:
+ log.error("Batch operation type {} unsupported.", fbe.getOperator());
+ }
+ }
+ for (FlowRuleProvider provider : batches.keySet()) {
+ FlowRuleBatchOperation b =
+ new FlowRuleBatchOperation(batches.get(provider));
+ Future<Void> future = provider.executeBatch(b);
+ futures.add(future);
+ }
+ return new FlowRuleBatchFuture(futures);
+ }
+
+ @Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
@@ -296,4 +337,63 @@
eventDispatcher.post(event);
}
}
+
+ private class FlowRuleBatchFuture
+ implements Future<CompletedBatchOperation> {
+
+ private final List<Future<Void>> futures;
+
+ public FlowRuleBatchFuture(List<Future<Void>> futures) {
+ this.futures = futures;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ boolean isDone = true;
+ for (Future<Void> future : futures) {
+ isDone &= future.isDone();
+ }
+ return isDone;
+ }
+
+ @Override
+ public CompletedBatchOperation get() throws InterruptedException,
+ ExecutionException {
+ // TODO Auto-generated method stub
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ return new CompletedBatchOperation();
+ }
+
+ @Override
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // TODO we should decrement the timeout
+ long start = System.nanoTime();
+ long end = start + unit.toNanos(timeout);
+ for (Future<Void> future : futures) {
+ long now = System.nanoTime();
+ long thisTimeout = end - now;
+ future.get(thisTimeout, TimeUnit.NANOSECONDS);
+ }
+ return new CompletedBatchOperation();
+ }
+
+ }
+
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index a0995e4..0ca75c2 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -4,6 +4,8 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -16,6 +18,9 @@
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
@@ -24,6 +29,8 @@
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
+
/**
* Installer for {@link PathIntent path connectivity intents}.
*/
@@ -56,19 +63,27 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
-
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
+
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
- flowRuleService.applyFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
+ //flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
-
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
@Override
@@ -77,6 +92,7 @@
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
+ List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
@@ -86,9 +102,16 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
-
- flowRuleService.removeFlowRules(rule);
+ rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
+ //flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
+ FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
+ try {
+ flowRuleService.applyBatch(batch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 7463671..86f3ddc 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@@ -32,6 +33,7 @@
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -42,6 +44,7 @@
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
@@ -404,6 +407,13 @@
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
+ @Override
+ public Future<Void> executeBatch(
+ BatchOperation<FlowRuleBatchEntry> batch) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index ade651e..78f5874 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -68,7 +68,7 @@
this.cookie = flowRule.id();
}
- public OFFlowMod buildFlowMod() {
+ public OFFlowMod buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
@@ -86,6 +86,24 @@
}
+ public OFFlowMod buildFlowMod() {
+ Match match = buildMatch();
+ List<OFAction> actions = buildActions();
+
+ //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
+ OFFlowMod fm = factory.buildFlowModify()
+ .setCookie(U64.of(cookie.value()))
+ .setBufferId(OFBufferId.NO_BUFFER)
+ .setActions(actions)
+ .setMatch(match)
+ .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
+ .setPriority(priority)
+ .build();
+
+ return fm;
+
+ }
+
public OFFlowMod buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index eac3c18..0aca754 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,8 +2,17 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -14,9 +23,11 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
@@ -27,6 +38,8 @@
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -42,9 +55,11 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -70,6 +85,9 @@
private final InternalFlowProvider listener = new InternalFlowProvider();
+ private final Map<Long, InstallationFuture> pendingFutures =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -101,7 +119,7 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
@@ -154,6 +172,7 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
+ InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
@@ -166,7 +185,17 @@
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.satisfyRequirement(dpid);
+ }
+ break;
case ERROR:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.fail((OFErrorMsg) msg, dpid);
+ }
+ break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -226,6 +255,144 @@
}
+ @Override
+ public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ switch (fbe.getOperator()) {
+ case ADD:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
+ break;
+ case REMOVE:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
+ break;
+ case MODIFY:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ }
+ InstallationFuture installation = new InstallationFuture(sws);
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+ private class InstallationFuture implements Future<Void> {
+
+ private final Set<Dpid> sws;
+ private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+
+ private final CountDownLatch countDownLatch;
+
+ public InstallationFuture(Set<Dpid> sws) {
+ this.sws = sws;
+ countDownLatch = new CountDownLatch(sws.size());
+ }
+
+ public void fail(OFErrorMsg msg, Dpid dpid) {
+ ok.set(false);
+ //TODO add reason to flowentry
+ //TODO handle specific error msgs
+ //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
+ switch (msg.getErrType()) {
+ case BAD_ACTION:
+ break;
+ case BAD_INSTRUCTION:
+ break;
+ case BAD_MATCH:
+ break;
+ case BAD_REQUEST:
+ break;
+ case EXPERIMENTER:
+ break;
+ case FLOW_MOD_FAILED:
+ break;
+ case GROUP_MOD_FAILED:
+ break;
+ case HELLO_FAILED:
+ break;
+ case METER_MOD_FAILED:
+ break;
+ case PORT_MOD_FAILED:
+ break;
+ case QUEUE_OP_FAILED:
+ break;
+ case ROLE_REQUEST_FAILED:
+ break;
+ case SWITCH_CONFIG_FAILED:
+ break;
+ case TABLE_FEATURES_FAILED:
+ break;
+ case TABLE_MOD_FAILED:
+ break;
+ default:
+ break;
+
+ }
+
+ }
+
+ public void satisfyRequirement(Dpid dpid) {
+ log.warn("Satisfaction from switch {}", dpid);
+ sws.remove(controller.getSwitch(dpid));
+ countDownLatch.countDown();
+ }
+
+ public void verify(Integer id) {
+ for (Dpid dpid : sws) {
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(id);
+ sw.sendMsg(builder.build());
+ }
+
+
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sws.isEmpty();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ countDownLatch.await();
+ //return offendingFlowMods;
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ countDownLatch.await(timeout, unit);
+ //return offendingFlowMods;
+ return null;
+ }
+
+ }
}