Flow Objective implementation
Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.
Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 4e78d04..17b0aa7 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -17,7 +17,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,9 +43,12 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,14 +56,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Set;
import static com.google.common.base.Preconditions.checkState;
/**
* Provides implementation of the flow objective programming service.
*/
-@Component(immediate = false)
+@Component(immediate = true)
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
@@ -89,6 +92,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowObjectiveStore flowObjectiveStore;
+
+ private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
@@ -101,10 +108,16 @@
private final Map<DeviceId, Collection<Objective>> pendingObjectives =
Maps.newConcurrentMap();
+
private NodeId localNode;
+ private Map<Integer, Set<PendingNext>> pendingForwards =
+ Maps.newConcurrentMap();
+
+
@Activate
protected void activate() {
+ flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
@@ -114,46 +127,64 @@
@Deactivate
protected void deactivate() {
+ flowObjectiveStore.unsetDelegate(delegate);
mastershipService.removeListener(mastershipListener);
deviceService.removeListener(deviceListener);
log.info("Stopped");
}
@Override
- public Future<Boolean> filter(DeviceId deviceId,
- Collection<FilteringObjective> filteringObjectives) {
+ public void filter(DeviceId deviceId,
+ FilteringObjective filteringObjective) {
if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).filter(filteringObjectives);
+ getDevicePipeliner(deviceId).filter(filteringObjective);
} else {
- filteringObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+ updatePendingMap(deviceId, filteringObjective);
}
- return Futures.immediateFuture(true);
- }
-
-
- @Override
- public Future<Boolean> forward(DeviceId deviceId,
- Collection<ForwardingObjective> forwardingObjectives) {
- if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).forward(forwardingObjectives);
- } else {
- forwardingObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
- }
- return Futures.immediateFuture(true);
}
@Override
- public Future<Boolean> next(DeviceId deviceId,
- Collection<NextObjective> nextObjectives) {
- if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).next(nextObjectives);
- } else {
- nextObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+ public void forward(DeviceId deviceId,
+ ForwardingObjective forwardingObjective) {
+
+ if (queueObjective(deviceId, forwardingObjective)) {
+ return;
}
- return Futures.immediateFuture(true);
+
+ if (deviceService.isAvailable(deviceId)) {
+ getDevicePipeliner(deviceId).forward(forwardingObjective);
+ } else {
+ updatePendingMap(deviceId, forwardingObjective);
+ }
+
}
+ @Override
+ public void next(DeviceId deviceId,
+ NextObjective nextObjective) {
+ if (deviceService.isAvailable(deviceId)) {
+ getDevicePipeliner(deviceId).next(nextObjective);
+ } else {
+ updatePendingMap(deviceId, nextObjective);
+ }
+ }
+
+ private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+ if (fwd.nextId() != null &&
+ flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+ log.warn("Queuing forwarding objective.");
+ if (pendingForwards.putIfAbsent(fwd.nextId(),
+ Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
+ Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
+ pending.add(new PendingNext(deviceId, fwd));
+ }
+ return true;
+ }
+ return false;
+ }
+
+
private void updatePendingMap(DeviceId deviceId, Objective pending) {
if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
Collection<Objective> objectives = pendingObjectives.get(deviceId);
@@ -169,6 +200,33 @@
return pipeliner;
}
+ private void setupPipelineHandler(DeviceId deviceId) {
+ if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
+ // Attempt to lookup the handler in the cache
+ DriverHandler handler = driverHandlers.get(deviceId);
+ if (handler == null) {
+ try {
+ // Otherwise create it and if it has pipeline behaviour, cache it
+ handler = driverService.createHandler(deviceId);
+ if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+ log.warn("Pipeline behaviour not supported for device {}",
+ deviceId);
+ return;
+ }
+ } catch (ItemNotFoundException e) {
+ log.warn("No applicable driver for device {}", deviceId);
+ return;
+ }
+ driverHandlers.put(deviceId, handler);
+ }
+
+ // Always (re)initialize the pipeline behaviour
+ Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+ pipeliner.init(deviceId, context);
+ pipeliners.putIfAbsent(deviceId, pipeliner);
+ log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
+ }
+ }
// Triggers driver setup when the local node becomes a device master.
private class InnerMastershipListener implements MastershipListener {
@@ -221,52 +279,70 @@
pendingObjectives.getOrDefault(deviceId,
Collections.emptySet()).forEach(obj -> {
if (obj instanceof NextObjective) {
- getDevicePipeliner(deviceId)
- .next(Collections.singletonList((NextObjective) obj));
+ next(deviceId, (NextObjective) obj);
} else if (obj instanceof ForwardingObjective) {
- getDevicePipeliner(deviceId)
- .forward(Collections.singletonList((ForwardingObjective) obj));
+ forward(deviceId, (ForwardingObjective) obj);
} else {
getDevicePipeliner(deviceId)
- .filter(Collections.singletonList((FilteringObjective) obj));
+ .filter((FilteringObjective) obj);
}
});
}
}
- private void setupPipelineHandler(DeviceId deviceId) {
- if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
- // Attempt to lookup the handler in the cache
- DriverHandler handler = driverHandlers.get(deviceId);
- if (handler == null) {
- try {
- // Otherwise create it and if it has pipeline behaviour, cache it
- handler = driverService.createHandler(deviceId);
- if (!handler.driver().hasBehaviour(Pipeliner.class)) {
- log.warn("Pipeline behaviour not supported for device {}",
- deviceId);
- return;
- }
- } catch (ItemNotFoundException e) {
- log.warn("No applicable driver for device {}", deviceId);
- return;
- }
- driverHandlers.put(deviceId, handler);
- }
-
- // Always (re)initialize the pipeline behaviour
- Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
- pipeliner.init(deviceId, context);
- pipeliners.putIfAbsent(deviceId, pipeliner);
- log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
- }
- }
-
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
@Override
public ServiceDirectory directory() {
return serviceDirectory;
}
+
+ @Override
+ public FlowObjectiveStore store() {
+ return flowObjectiveStore;
+ }
+
+
+ }
+
+ private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+ @Override
+ public void notify(ObjectiveEvent event) {
+ Set<PendingNext> pending = pendingForwards.remove(event.subject());
+
+ if (pending == null) {
+ return;
+ }
+
+ log.info("Processing pending objectives {}", pending.size());
+
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward(p.forwardingObjective()));
+
+ }
+ }
+
+ /**
+ * Data class used to hold a pending forwarding objective that could not
+ * be processed because the associated next object was not present.
+ */
+ private class PendingNext {
+ private final DeviceId deviceId;
+ private final ForwardingObjective fwd;
+
+ public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+ this.deviceId = deviceId;
+ this.fwd = fwd;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ForwardingObjective forwardingObjective() {
+ return fwd;
+ }
+
+
}
}