Upgrade packet requests to use flow objectives API.
Addressed a few issues found while using the flow objectives across a cluster:
* Flow objectives should be installable from any node, not just the master.
Therefore we need to ensure all nodes initialize a driver for each switch.
* We no longer store a list of objectives that are waiting for the switch
to arrive. If the we don't know about the switch yet we'll try a few times
over a few seconds to find it, but after that we'll give up and report an
error to the client.
* Default drivers need to be available when the FlowObjectiveManager starts
up, otherwise it is common to get flow objective requests before any
drivers have been loaded.
Change-Id: I1c2ea6a223232402c31e8139729e4b6251ab8b0f
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 7f6b0ee..79d486e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.net.flowobjective.impl;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
@@ -48,17 +47,18 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
-import static com.google.common.base.Preconditions.checkState;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Provides implementation of the flow objective programming service.
@@ -67,9 +67,10 @@
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ public static final int INSTALL_RETRY_ATTEMPTS = 5;
+ public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
- public static final String NOT_INITIALIZED = "Driver not initialized";
+ private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@@ -106,17 +107,18 @@
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
- private final Map<DeviceId, Collection<Objective>> pendingObjectives =
- Maps.newConcurrentMap();
-
private NodeId localNode;
private Map<Integer, Set<PendingNext>> pendingForwards =
Maps.newConcurrentMap();
+ private ExecutorService executorService;
@Activate
protected void activate() {
+ executorService = Executors.newFixedThreadPool(
+ 4, groupedThreads("onos/objective-installer", "%d"));
+
flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
@@ -133,15 +135,55 @@
log.info("Stopped");
}
- @Override
- public void filter(DeviceId deviceId,
- FilteringObjective filteringObjective) {
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).filter(filteringObjective);
- } else {
- updatePendingMap(deviceId, filteringObjective);
+ /**
+ * Task that passes the flow objective down to the driver. The task will
+ * make a few attempts to find the appropriate driver, then eventually give
+ * up and report an error if no suitable driver could be found.
+ */
+ private class ObjectiveInstaller implements Runnable {
+ private final DeviceId deviceId;
+ private final Objective objective;
+
+ private int numAttempts = 0;
+
+ public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+ this.deviceId = deviceId;
+ this.objective = objective;
}
+ @Override
+ public void run() {
+ try {
+ numAttempts++;
+
+ Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+ if (pipeliner != null) {
+ if (objective instanceof NextObjective) {
+ pipeliner.next((NextObjective) objective);
+ } else if (objective instanceof ForwardingObjective) {
+ pipeliner.forward((ForwardingObjective) objective);
+ } else {
+ pipeliner.filter((FilteringObjective) objective);
+ }
+ } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+ Thread.currentThread().sleep(INSTALL_RETRY_INTERVAL);
+ executorService.submit(this);
+ } else {
+ // Otherwise we've tried a few times and failed, report an
+ // error back to the user.
+ objective.context().ifPresent(
+ c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
+ }
+ } catch (Exception e) {
+ log.warn("Exception while installing flow objective", e);
+ }
+ }
+ }
+
+ @Override
+ public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+ executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
}
@Override
@@ -152,22 +194,12 @@
return;
}
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).forward(forwardingObjective);
- } else {
- updatePendingMap(deviceId, forwardingObjective);
- }
-
+ executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
- public void next(DeviceId deviceId,
- NextObjective nextObjective) {
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).next(nextObjective);
- } else {
- updatePendingMap(deviceId, nextObjective);
- }
+ public void next(DeviceId deviceId, NextObjective nextObjective) {
+ executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
}
@Override
@@ -189,49 +221,38 @@
return false;
}
-
- private void updatePendingMap(DeviceId deviceId, Objective pending) {
- if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
- Collection<Objective> objectives = pendingObjectives.get(deviceId);
- objectives.add(pending);
- }
-
- }
-
// Retrieves the device pipeline behaviour from the cache.
private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Pipeliner pipeliner = pipeliners.get(deviceId);
- checkState(pipeliner != null, NOT_INITIALIZED);
return pipeliner;
}
private void setupPipelineHandler(DeviceId deviceId) {
- if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
- // Attempt to lookup the handler in the cache
- DriverHandler handler = driverHandlers.get(deviceId);
- if (handler == null) {
- try {
- // Otherwise create it and if it has pipeline behaviour, cache it
- handler = driverService.createHandler(deviceId);
- if (!handler.driver().hasBehaviour(Pipeliner.class)) {
- log.warn("Pipeline behaviour not supported for device {}",
- deviceId);
- return;
- }
- } catch (ItemNotFoundException e) {
- log.warn("No applicable driver for device {}", deviceId);
+ // Attempt to lookup the handler in the cache
+ DriverHandler handler = driverHandlers.get(deviceId);
+ if (handler == null) {
+ try {
+ // Otherwise create it and if it has pipeline behaviour, cache it
+ handler = driverService.createHandler(deviceId);
+ if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+ log.warn("Pipeline behaviour not supported for device {}",
+ deviceId);
return;
}
- driverHandlers.put(deviceId, handler);
+ } catch (ItemNotFoundException e) {
+ log.warn("No applicable driver for device {}", deviceId);
+ return;
}
- // Always (re)initialize the pipeline behaviour
- log.info("Driver {} bound to device {} ... initializing driver",
- handler.driver().name(), deviceId);
- Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
- pipeliner.init(deviceId, context);
- pipeliners.putIfAbsent(deviceId, pipeliner);
+ driverHandlers.put(deviceId, handler);
}
+
+ // Always (re)initialize the pipeline behaviour
+ log.info("Driver {} bound to device {} ... initializing driver",
+ handler.driver().name(), deviceId);
+ Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+ pipeliner.init(deviceId, context);
+ pipeliners.putIfAbsent(deviceId, pipeliner);
}
// Triggers driver setup when the local node becomes a device master.
@@ -240,10 +261,8 @@
public void event(MastershipEvent event) {
switch (event.type()) {
case MASTER_CHANGED:
- if (event.roleInfo().master() != null) {
- setupPipelineHandler(event.subject());
- log.info("mastership changed on device {}", event.subject());
- }
+ log.info("mastership changed on device {}", event.subject());
+ setupPipelineHandler(event.subject());
break;
case BACKUPS_CHANGED:
break;
@@ -259,13 +278,14 @@
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
+ setupPipelineHandler(event.subject().id());
+ break;
case DEVICE_AVAILABILITY_CHANGED:
log.info("Device either added or availability changed {}",
event.subject().id());
if (deviceService.isAvailable(event.subject().id())) {
log.info("Device is now available {}", event.subject().id());
setupPipelineHandler(event.subject().id());
- processPendingObjectives(event.subject().id());
}
break;
case DEVICE_UPDATED:
@@ -284,22 +304,6 @@
break;
}
}
-
- private void processPendingObjectives(DeviceId deviceId) {
- log.debug("Processing pending objectives for device {}", deviceId);
-
- pendingObjectives.getOrDefault(deviceId,
- Collections.emptySet()).forEach(obj -> {
- if (obj instanceof NextObjective) {
- next(deviceId, (NextObjective) obj);
- } else if (obj instanceof ForwardingObjective) {
- forward(deviceId, (ForwardingObjective) obj);
- } else {
- getDevicePipeliner(deviceId)
- .filter((FilteringObjective) obj);
- }
- });
- }
}
// Processing context for initializing pipeline driver behaviours.
@@ -313,8 +317,6 @@
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
-
-
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@@ -356,7 +358,5 @@
public ForwardingObjective forwardingObjective() {
return fwd;
}
-
-
}
}