clean batch operations
Change-Id: I7187de40bb5276d6ae9e9831e5d47d36e16560ad
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 0aca754..2da09fe 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,6 +2,7 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -21,9 +22,12 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
@@ -40,6 +44,7 @@
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -52,6 +57,11 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
+import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
@@ -70,6 +80,8 @@
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
+ enum BatchState { STARTED, FINISHED, CANCELLED };
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -88,6 +100,9 @@
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
+ private final Map<Long, InstallationFuture> pendingFMs =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -143,9 +158,47 @@
removeFlowRule(flowRules);
}
+ @Override
+ public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
+ OFFlowMod mod = null;
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ FlowModBuilder builder = new FlowModBuilder(flowRule, sw.factory());
+ switch (fbe.getOperator()) {
+ case ADD:
+ mod = builder.buildFlowAdd();
+ break;
+ case REMOVE:
+ mod = builder.buildFlowDel();
+ break;
+ case MODIFY:
+ mod = builder.buildFlowMod();
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ if (mod != null) {
+ sw.sendMsg(mod);
+ fmXids.put(mod.getXid(), fbe);
+ } else {
+ log.error("Conversion of flowrule {} failed.", flowRule);
+ }
- //TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
- // possibly barriers as well. May not be internal at all...
+ }
+ InstallationFuture installation = new InstallationFuture(sws, fmXids);
+ for (Long xid : fmXids.keySet()) {
+ pendingFMs.put(xid, installation);
+ }
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
@@ -175,7 +228,6 @@
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
- //TODO: make this better
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
@@ -191,7 +243,7 @@
}
break;
case ERROR:
- future = pendingFutures.get(msg.getXid());
+ future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
@@ -203,10 +255,7 @@
}
@Override
- public void roleAssertFailed(Dpid dpid, RoleState role) {
- // TODO Auto-generated method stub
-
- }
+ public void roleAssertFailed(Dpid dpid, RoleState role) {}
private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
if (stats.getStatsType() != OFStatsType.FLOW) {
@@ -230,7 +279,6 @@
}
private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
- // TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
if (reply.getVersion().equals(OFVersion.OF_10) ||
reply.getMatch().getMatchFields().iterator().hasNext()) {
return false;
@@ -251,104 +299,91 @@
}
return false;
}
-
}
-
- @Override
- public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
- final Set<Dpid> sws = new HashSet<Dpid>();
-
- for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- FlowRule flowRule = fbe.getTarget();
- OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sws.add(new Dpid(sw.getId()));
- switch (fbe.getOperator()) {
- case ADD:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
- break;
- case REMOVE:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
- break;
- case MODIFY:
- //TODO: Track XID for each flowmod
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
- break;
- default:
- log.error("Unsupported batch operation {}", fbe.getOperator());
- }
- }
- InstallationFuture installation = new InstallationFuture(sws);
- pendingFutures.put(U32.f(batch.hashCode()), installation);
- installation.verify(batch.hashCode());
- return installation;
- }
-
- private class InstallationFuture implements Future<Void> {
+ private class InstallationFuture implements Future<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final Map<Long, FlowRuleBatchEntry> fms;
+
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
+ private Integer pendingXid;
+ private BatchState state;
- public InstallationFuture(Set<Dpid> sws) {
+ public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
+ this.state = BatchState.STARTED;
this.sws = sws;
+ this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
- //TODO add reason to flowentry
+ FlowEntry fe = null;
+ FlowRuleBatchEntry fbe = fms.get(msg.getXid());
+ FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
- //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
+ OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
+ bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
+ OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
+ badins.getCode().ordinal());
break;
case BAD_MATCH:
+ OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
+ badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
- break;
- case EXPERIMENTER:
+ OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
+ badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
+ OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
+ fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
+ fmFail.getCode().ordinal());
break;
+ case EXPERIMENTER:
case GROUP_MOD_FAILED:
- break;
case HELLO_FAILED:
- break;
case METER_MOD_FAILED:
- break;
case PORT_MOD_FAILED:
- break;
case QUEUE_OP_FAILED:
- break;
case ROLE_REQUEST_FAILED:
- break;
case SWITCH_CONFIG_FAILED:
- break;
case TABLE_FEATURES_FAILED:
- break;
case TABLE_MOD_FAILED:
+ fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
break;
default:
- break;
+ log.error("Unknown error type {}", msg.getErrType());
}
+ offendingFlowMods.add(fe);
}
+
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
- sws.remove(controller.getSwitch(dpid));
+ sws.remove(dpid);
countDownLatch.countDown();
+ cleanUp();
+
}
+
public void verify(Integer id) {
+ pendingXid = id;
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
@@ -356,41 +391,57 @@
.setXid(id);
sw.sendMsg(builder.build());
}
-
-
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO Auto-generated method stub
- return false;
+ this.state = BatchState.CANCELLED;
+ cleanUp();
+ for (FlowRuleBatchEntry fbe : fms.values()) {
+ if (fbe.getOperator() == FlowRuleOperation.ADD ||
+ fbe.getOperator() == FlowRuleOperation.MODIFY) {
+ removeFlowRule(fbe.getTarget());
+ } else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
+ applyRule(fbe.getTarget());
+ }
+
+ }
+ return isCancelled();
}
@Override
public boolean isCancelled() {
- // TODO Auto-generated method stub
- return false;
+ return this.state == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
- return sws.isEmpty();
+ return this.state == BatchState.FINISHED;
}
@Override
- public Void get() throws InterruptedException, ExecutionException {
+ public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
- //return offendingFlowMods;
- return null;
+ this.state = BatchState.FINISHED;
+ return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
@Override
- public Void get(long timeout, TimeUnit unit)
+ public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
countDownLatch.await(timeout, unit);
- //return offendingFlowMods;
- return null;
+ this.state = BatchState.FINISHED;
+ return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ }
+
+ private void cleanUp() {
+ if (sws.isEmpty()) {
+ pendingFutures.remove(pendingXid);
+ for (Long xid : fms.keySet()) {
+ pendingFMs.remove(xid);
+ }
+ }
}
}