initial working impl of batch operations
Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index eac3c18..0aca754 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,8 +2,17 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -14,9 +23,11 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
+import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
@@ -27,6 +38,8 @@
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
@@ -42,9 +55,11 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -70,6 +85,9 @@
private final InternalFlowProvider listener = new InternalFlowProvider();
+ private final Map<Long, InstallationFuture> pendingFutures =
+ new ConcurrentHashMap<Long, InstallationFuture>();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -101,7 +119,7 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
- sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
@@ -154,6 +172,7 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
+ InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
@@ -166,7 +185,17 @@
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.satisfyRequirement(dpid);
+ }
+ break;
case ERROR:
+ future = pendingFutures.get(msg.getXid());
+ if (future != null) {
+ future.fail((OFErrorMsg) msg, dpid);
+ }
+ break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -226,6 +255,144 @@
}
+ @Override
+ public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ final Set<Dpid> sws = new HashSet<Dpid>();
+ for (FlowRuleBatchEntry fbe : batch.getOperations()) {
+ FlowRule flowRule = fbe.getTarget();
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
+ sws.add(new Dpid(sw.getId()));
+ switch (fbe.getOperator()) {
+ case ADD:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
+ break;
+ case REMOVE:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
+ break;
+ case MODIFY:
+ //TODO: Track XID for each flowmod
+ sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
+ break;
+ default:
+ log.error("Unsupported batch operation {}", fbe.getOperator());
+ }
+ }
+ InstallationFuture installation = new InstallationFuture(sws);
+ pendingFutures.put(U32.f(batch.hashCode()), installation);
+ installation.verify(batch.hashCode());
+ return installation;
+ }
+
+ private class InstallationFuture implements Future<Void> {
+
+ private final Set<Dpid> sws;
+ private final AtomicBoolean ok = new AtomicBoolean(true);
+ private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+
+ private final CountDownLatch countDownLatch;
+
+ public InstallationFuture(Set<Dpid> sws) {
+ this.sws = sws;
+ countDownLatch = new CountDownLatch(sws.size());
+ }
+
+ public void fail(OFErrorMsg msg, Dpid dpid) {
+ ok.set(false);
+ //TODO add reason to flowentry
+ //TODO handle specific error msgs
+ //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
+ switch (msg.getErrType()) {
+ case BAD_ACTION:
+ break;
+ case BAD_INSTRUCTION:
+ break;
+ case BAD_MATCH:
+ break;
+ case BAD_REQUEST:
+ break;
+ case EXPERIMENTER:
+ break;
+ case FLOW_MOD_FAILED:
+ break;
+ case GROUP_MOD_FAILED:
+ break;
+ case HELLO_FAILED:
+ break;
+ case METER_MOD_FAILED:
+ break;
+ case PORT_MOD_FAILED:
+ break;
+ case QUEUE_OP_FAILED:
+ break;
+ case ROLE_REQUEST_FAILED:
+ break;
+ case SWITCH_CONFIG_FAILED:
+ break;
+ case TABLE_FEATURES_FAILED:
+ break;
+ case TABLE_MOD_FAILED:
+ break;
+ default:
+ break;
+
+ }
+
+ }
+
+ public void satisfyRequirement(Dpid dpid) {
+ log.warn("Satisfaction from switch {}", dpid);
+ sws.remove(controller.getSwitch(dpid));
+ countDownLatch.countDown();
+ }
+
+ public void verify(Integer id) {
+ for (Dpid dpid : sws) {
+ OpenFlowSwitch sw = controller.getSwitch(dpid);
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(id);
+ sw.sendMsg(builder.build());
+ }
+
+
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sws.isEmpty();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ countDownLatch.await();
+ //return offendingFlowMods;
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException,
+ TimeoutException {
+ countDownLatch.await(timeout, unit);
+ //return offendingFlowMods;
+ return null;
+ }
+
+ }
}