Implementation of new Flow Subsystem:
The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.
Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9
more flowservice improvements
Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f
more flowservice impl
Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f
Manager to store functional (at least i believe it)
Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53
flow subsystem functional: need to fix unit tests
Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0
flow subsystem functional
Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d
finished refactor of flow subsystem
Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804
fix for null flow provider to use new api
Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index dd0bcaa..70d35aa 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,25 +15,13 @@
*/
package org.onosproject.provider.of.flow.impl;
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,19 +29,16 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.net.topology.TopologyService;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -63,6 +48,7 @@
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
@@ -75,21 +61,22 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
-import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.slf4j.Logger;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Provider which uses an OpenFlow controller to detect network
@@ -98,8 +85,6 @@
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
- enum BatchState { STARTED, FINISHED, CANCELLED }
-
private static final int LOWEST_PRIORITY = 0;
private final Logger log = getLogger(getClass());
@@ -110,22 +95,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenFlowController controller;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected TopologyService topologyService;
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
- // FIXME: This should be an expiring map to ensure futures that don't have
- // a future eventually get garbage collected.
- private final Map<Long, InstallationFuture> pendingFutures = new ConcurrentHashMap<>();
-
- private final Map<Long, InstallationFuture> pendingFMs = new ConcurrentHashMap<>();
+ private Cache<Long, InternalCacheEntry> pendingBatches;
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
- private final AtomicLong xidCounter = new AtomicLong(1);
/**
* Creates an OpenFlow host provider.
@@ -140,6 +118,16 @@
controller.addListener(listener);
controller.addEventListener(listener);
+ pendingBatches = CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ providerService.batchOperationCompleted(notification.getKey(),
+ notification.getValue().failedCompletion());
+ }
+ }).build();
+
+
for (OpenFlowSwitch sw : controller.getSwitches()) {
FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
fsc.start();
@@ -160,8 +148,8 @@
@Override
public void applyFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- applyRule(flowRules[i]);
+ for (FlowRule flowRule : flowRules) {
+ applyRule(flowRule);
}
}
@@ -179,8 +167,8 @@
@Override
public void removeFlowRule(FlowRule... flowRules) {
- for (int i = 0; i < flowRules.length; i++) {
- removeRule(flowRules[i]);
+ for (FlowRule flowRule : flowRules) {
+ removeRule(flowRule);
}
}
@@ -203,36 +191,20 @@
}
@Override
- public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
- final Set<Dpid> sws = Sets.newConcurrentHashSet();
- final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
- /*
- * Use identity hash map for reference equality as we could have equal
- * flow mods for different switches.
- */
- Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
- Map<OFFlowMod, OpenFlowSwitch.TableType> modTypes = Maps.newIdentityHashMap();
+ public void executeBatch(FlowRuleBatchOperation batch) {
+
+ pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
+
+
+ OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
+ OFFlowMod mod;
+
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
- FlowRule flowRule = fbe.target();
- final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
- OpenFlowSwitch sw = controller.getSwitch(dpid);
- if (sw == null) {
- /*
- * if a switch we are supposed to install to is gone then
- * cancel (ie. rollback) the work that has been done so far
- * and return the associated future.
- */
- InstallationFuture failed = new InstallationFuture(sws, fmXids);
- failed.cancel(true);
- return failed;
- }
- sws.add(dpid);
- final Long flowModXid = xidCounter.getAndIncrement();
+
FlowModBuilder builder =
- FlowModBuilder.builder(flowRule, sw.factory(),
- Optional.of(flowModXid));
- OFFlowMod mod = null;
+ FlowModBuilder.builder(fbe.target(), sw.factory(),
+ Optional.of(batch.id()));
switch (fbe.operator()) {
case ADD:
mod = builder.buildFlowAdd();
@@ -244,34 +216,16 @@
mod = builder.buildFlowMod();
break;
default:
- log.error("Unsupported batch operation {}", fbe.operator());
- }
- if (mod != null) {
- mods.put(mod, sw);
- modTypes.put(mod, getTableType(flowRule.type()));
- fmXids.put(flowModXid, fbe);
- } else {
- log.error("Conversion of flowrule {} failed.", flowRule);
- }
+ log.error("Unsupported batch operation {}; skipping flowmod {}",
+ fbe.operator(), fbe);
+ continue;
+ }
+ sw.sendMsg(mod);
}
- InstallationFuture installation = new InstallationFuture(sws, fmXids);
- for (Long xid : fmXids.keySet()) {
- pendingFMs.put(xid, installation);
- }
-
- pendingFutures.put(installation.xid(), installation);
- for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
- OpenFlowSwitch sw = entry.getValue();
- OFFlowMod mod = entry.getKey();
- OpenFlowSwitch.TableType tableType = modTypes.get(mod);
- if (tableType == OpenFlowSwitch.TableType.NONE) {
- sw.sendMsg(mod);
- } else {
- sw.sendMsg(mod, tableType);
- }
- }
- installation.verify();
- return installation;
+ OFBarrierRequest.Builder builder = sw.factory()
+ .buildBarrierRequest()
+ .setXid(batch.id());
+ sw.sendMsg(builder.build());
}
private OpenFlowSwitch.TableType getTableType(FlowRule.Type type) {
@@ -287,13 +241,12 @@
}
}
+
+
+
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
-
- private final Multimap<DeviceId, FlowEntry> completeEntries =
- ArrayListMultimap.create();
-
@Override
public void switchAdded(Dpid dpid) {
FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
@@ -320,7 +273,6 @@
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
- InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
OFFlowRemoved removed = (OFFlowRemoved) msg;
@@ -334,22 +286,42 @@
}
break;
case BARRIER_REPLY:
- future = pendingFutures.get(msg.getXid());
- if (future != null) {
- future.satisfyRequirement(dpid);
- } else {
- log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+ try {
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ providerService.batchOperationCompleted(msg.getXid(), entry.completed());
+ } else {
+ log.warn("Received unknown Barrier Reply: {}", msg.getXid());
+ }
+ } finally {
+ pendingBatches.invalidate(msg.getXid());
}
break;
case ERROR:
log.warn("received Error message {} from {}", msg, dpid);
- future = pendingFMs.get(msg.getXid());
- if (future != null) {
- future.fail((OFErrorMsg) msg, dpid);
+
+ OFErrorMsg error = (OFErrorMsg) msg;
+ if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+ OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+ if (fmFailed.getData().getParsedMessage().isPresent()) {
+ OFMessage m = fmFailed.getData().getParsedMessage().get();
+ OFFlowMod fm = (OFFlowMod) m;
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
+ if (entry != null) {
+ entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
+ } else {
+ log.error("No matching batch for this error: {}", error);
+ }
+ } else {
+ //FIXME: Potentially add flowtracking to avoid this message.
+ log.error("Flow installation failed but switch didn't" +
+ " tell us which one.");
+ }
} else {
- log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
+ log.warn("Received error {}", error);
}
- break;
+
+
default:
log.debug("Unhandled message type: {}", msg.getType());
}
@@ -402,198 +374,50 @@
}
- private class InstallationFuture implements Future<CompletedBatchOperation> {
+ /**
+ * The internal cache entry holding the original request as well
+ * as accumulating the any failures along the way.
+ *
+ * If this entry is evicted from the cache then the entire operation
+ * is considered failed. Otherwise, only the failures reported by the device
+ * will be propagated up.
+ */
+ private class InternalCacheEntry {
- // barrier xid
- private final Long xid;
- // waiting for barrier reply from...
- private final Set<Dpid> sws;
- private final AtomicBoolean ok = new AtomicBoolean(true);
- // FlowMod xid ->
- private final Map<Long, FlowRuleBatchEntry> fms;
+ private final FlowRuleBatchOperation operation;
+ private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
-
- private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
- // Failed batch operation id
- private Long failedId;
-
- private final CountDownLatch countDownLatch;
- private BatchState state;
-
- public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
- this.xid = xidCounter.getAndIncrement();
- this.state = BatchState.STARTED;
- this.sws = sws;
- this.fms = fmXids;
- countDownLatch = new CountDownLatch(sws.size());
+ public InternalCacheEntry(FlowRuleBatchOperation operation) {
+ this.operation = operation;
}
- public Long xid() {
- return xid;
+ /**
+ * Appends a failed rule to the set of failed items.
+ * @param rule the failed rule
+ */
+ public void appendFailure(FlowRule rule) {
+ failures.add(rule);
}
- public void fail(OFErrorMsg msg, Dpid dpid) {
-
- ok.set(false);
- FlowEntry fe = null;
- FlowRuleBatchEntry fbe = fms.get(msg.getXid());
- failedId = fbe.id();
- FlowRule offending = fbe.target();
- //TODO handle specific error msgs
- switch (msg.getErrType()) {
- case BAD_ACTION:
- OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
- bad.getCode().ordinal());
- break;
- case BAD_INSTRUCTION:
- OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
- badins.getCode().ordinal());
- break;
- case BAD_MATCH:
- OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
- badMatch.getCode().ordinal());
- break;
- case BAD_REQUEST:
- OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
- badReq.getCode().ordinal());
- break;
- case FLOW_MOD_FAILED:
- OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
- fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
- fmFail.getCode().ordinal());
- break;
- case EXPERIMENTER:
- case GROUP_MOD_FAILED:
- case HELLO_FAILED:
- case METER_MOD_FAILED:
- case PORT_MOD_FAILED:
- case QUEUE_OP_FAILED:
- case ROLE_REQUEST_FAILED:
- case SWITCH_CONFIG_FAILED:
- case TABLE_FEATURES_FAILED:
- case TABLE_MOD_FAILED:
- fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
- break;
- default:
- log.error("Unknown error type {}", msg.getErrType());
-
- }
- offendingFlowMods.add(fe);
-
- removeRequirement(dpid);
+ /**
+ * Fails the entire batch and returns the failed operation.
+ * @return the failed operation
+ */
+ public CompletedBatchOperation failedCompletion() {
+ Set<FlowRule> fails = operation.getOperations().stream()
+ .map(op -> op.target()).collect(Collectors.toSet());
+ return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
}
-
- public void satisfyRequirement(Dpid dpid) {
- log.debug("Satisfaction from switch {}", dpid);
- removeRequirement(dpid);
+ /**
+ * Returns the completed operation and whether the batch suceeded.
+ * @return the completed operation
+ */
+ public CompletedBatchOperation completed() {
+ return new CompletedBatchOperation(failures.isEmpty(),
+ Collections.unmodifiableSet(failures), operation.deviceId());
}
-
- public void verify() {
- checkState(!sws.isEmpty());
- for (Dpid dpid : sws) {
- OpenFlowSwitch sw = controller.getSwitch(dpid);
- OFBarrierRequest.Builder builder = sw.factory()
- .buildBarrierRequest()
- .setXid(xid);
- sw.sendMsg(builder.build());
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
- ok.set(false);
- this.state = BatchState.CANCELLED;
- cleanUp();
- for (FlowRuleBatchEntry fbe : fms.values()) {
- if (fbe.operator() == FlowRuleOperation.ADD ||
- fbe.operator() == FlowRuleOperation.MODIFY) {
- removeFlowRule(fbe.target());
- } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
- applyRule(fbe.target());
- }
-
- }
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return this.state == BatchState.CANCELLED;
- }
-
- @Override
- public boolean isDone() {
- return this.state == BatchState.FINISHED || isCancelled();
- }
-
- @Override
- public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
- countDownLatch.await();
- this.state = BatchState.FINISHED;
- Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
- CompletedBatchOperation result =
- new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
- //FIXME do cleanup here (moved by BOC)
- cleanUp();
- return result;
- }
-
- @Override
- public CompletedBatchOperation get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException,
- TimeoutException {
- if (countDownLatch.await(timeout, unit)) {
- this.state = BatchState.FINISHED;
- Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
- CompletedBatchOperation result =
- new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
- // FIXME do cleanup here (moved by BOC)
- cleanUp();
- return result;
- }
- throw new TimeoutException(this.toString());
- }
-
- private void cleanUp() {
- if (isDone() || isCancelled()) {
- pendingFutures.remove(xid);
- for (Long xid : fms.keySet()) {
- pendingFMs.remove(xid);
- }
- }
- }
-
- private void removeRequirement(Dpid dpid) {
- countDownLatch.countDown();
- sws.remove(dpid);
- //FIXME don't do cleanup here (moved by BOC)
- //cleanUp();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("xid", xid)
- .add("pending devices", sws)
- .add("devices in batch",
- fms.values().stream()
- .map((fbe) -> fbe.target().deviceId())
- .distinct().collect(Collectors.toList()))
- .add("failedId", failedId)
- .add("latchCount", countDownLatch.getCount())
- .add("state", state)
- .add("no error?", ok.get())
- .toString();
- }
}
}