Upgrade packet requests to use flow objectives API.
Addressed a few issues found while using the flow objectives across a cluster:
* Flow objectives should be installable from any node, not just the master.
Therefore we need to ensure all nodes initialize a driver for each switch.
* We no longer store a list of objectives that are waiting for the switch
to arrive. If the we don't know about the switch yet we'll try a few times
over a few seconds to find it, but after that we'll give up and report an
error to the client.
* Default drivers need to be available when the FlowObjectiveManager starts
up, otherwise it is common to get flow objective requests before any
drivers have been loaded.
Change-Id: I1c2ea6a223232402c31e8139729e4b6251ab8b0f
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java b/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java
new file mode 100644
index 0000000..2b7eeff
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.driver;
+
+/**
+ * Service capable of providing a set of default drivers.
+ */
+public interface DefaultDriverProviderService extends DriverProvider {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
index 6489bea..9bbe991 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
@@ -36,7 +36,7 @@
private final int timeout;
private final ApplicationId appId;
private final int priority;
- private final int nextId;
+ private final Integer nextId;
private final TrafficTreatment treatment;
private final Operation op;
private final Optional<ObjectiveContext> context;
@@ -46,7 +46,7 @@
private DefaultForwardingObjective(TrafficSelector selector,
Flag flag, boolean permanent,
int timeout, ApplicationId appId,
- int priority, int nextId,
+ int priority, Integer nextId,
TrafficTreatment treatment, Operation op) {
this.selector = selector;
this.flag = flag;
@@ -67,7 +67,7 @@
private DefaultForwardingObjective(TrafficSelector selector,
Flag flag, boolean permanent,
int timeout, ApplicationId appId,
- int priority, int nextId,
+ int priority, Integer nextId,
TrafficTreatment treatment,
ObjectiveContext context, Operation op) {
this.selector = selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
index 6e60ab6..192b206 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -41,6 +41,11 @@
GROUPMISSING,
/**
+ * The device was not available to install objectives to.
+ */
+ DEVICEMISSING,
+
+ /**
* An unknown error occurred.
*/
UNKNOWN
diff --git a/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java b/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
index 5f7e2e2..c3160be 100644
--- a/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
+++ b/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
@@ -31,6 +31,7 @@
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.DefaultDriverProvider;
+import org.onosproject.net.driver.DefaultDriverProviderService;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverAdminService;
import org.onosproject.net.driver.DriverHandler;
@@ -62,16 +63,21 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DefaultDriverProviderService defaultDriverService;
+
private Set<DriverProvider> providers = Sets.newConcurrentHashSet();
private Map<String, Driver> driverByKey = Maps.newConcurrentMap();
@Activate
protected void activate() {
+ registerProvider(defaultDriverService);
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ unregisterProvider(defaultDriverService);
log.info("Stopped");
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 7f6b0ee..79d486e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.net.flowobjective.impl;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
@@ -48,17 +47,18 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
-import static com.google.common.base.Preconditions.checkState;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Provides implementation of the flow objective programming service.
@@ -67,9 +67,10 @@
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ public static final int INSTALL_RETRY_ATTEMPTS = 5;
+ public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
- public static final String NOT_INITIALIZED = "Driver not initialized";
+ private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@@ -106,17 +107,18 @@
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
- private final Map<DeviceId, Collection<Objective>> pendingObjectives =
- Maps.newConcurrentMap();
-
private NodeId localNode;
private Map<Integer, Set<PendingNext>> pendingForwards =
Maps.newConcurrentMap();
+ private ExecutorService executorService;
@Activate
protected void activate() {
+ executorService = Executors.newFixedThreadPool(
+ 4, groupedThreads("onos/objective-installer", "%d"));
+
flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
@@ -133,15 +135,55 @@
log.info("Stopped");
}
- @Override
- public void filter(DeviceId deviceId,
- FilteringObjective filteringObjective) {
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).filter(filteringObjective);
- } else {
- updatePendingMap(deviceId, filteringObjective);
+ /**
+ * Task that passes the flow objective down to the driver. The task will
+ * make a few attempts to find the appropriate driver, then eventually give
+ * up and report an error if no suitable driver could be found.
+ */
+ private class ObjectiveInstaller implements Runnable {
+ private final DeviceId deviceId;
+ private final Objective objective;
+
+ private int numAttempts = 0;
+
+ public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+ this.deviceId = deviceId;
+ this.objective = objective;
}
+ @Override
+ public void run() {
+ try {
+ numAttempts++;
+
+ Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+ if (pipeliner != null) {
+ if (objective instanceof NextObjective) {
+ pipeliner.next((NextObjective) objective);
+ } else if (objective instanceof ForwardingObjective) {
+ pipeliner.forward((ForwardingObjective) objective);
+ } else {
+ pipeliner.filter((FilteringObjective) objective);
+ }
+ } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+ Thread.currentThread().sleep(INSTALL_RETRY_INTERVAL);
+ executorService.submit(this);
+ } else {
+ // Otherwise we've tried a few times and failed, report an
+ // error back to the user.
+ objective.context().ifPresent(
+ c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
+ }
+ } catch (Exception e) {
+ log.warn("Exception while installing flow objective", e);
+ }
+ }
+ }
+
+ @Override
+ public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+ executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
}
@Override
@@ -152,22 +194,12 @@
return;
}
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).forward(forwardingObjective);
- } else {
- updatePendingMap(deviceId, forwardingObjective);
- }
-
+ executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
- public void next(DeviceId deviceId,
- NextObjective nextObjective) {
- if (deviceService.isAvailable(deviceId)) {
- getDevicePipeliner(deviceId).next(nextObjective);
- } else {
- updatePendingMap(deviceId, nextObjective);
- }
+ public void next(DeviceId deviceId, NextObjective nextObjective) {
+ executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
}
@Override
@@ -189,49 +221,38 @@
return false;
}
-
- private void updatePendingMap(DeviceId deviceId, Objective pending) {
- if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
- Collection<Objective> objectives = pendingObjectives.get(deviceId);
- objectives.add(pending);
- }
-
- }
-
// Retrieves the device pipeline behaviour from the cache.
private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Pipeliner pipeliner = pipeliners.get(deviceId);
- checkState(pipeliner != null, NOT_INITIALIZED);
return pipeliner;
}
private void setupPipelineHandler(DeviceId deviceId) {
- if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
- // Attempt to lookup the handler in the cache
- DriverHandler handler = driverHandlers.get(deviceId);
- if (handler == null) {
- try {
- // Otherwise create it and if it has pipeline behaviour, cache it
- handler = driverService.createHandler(deviceId);
- if (!handler.driver().hasBehaviour(Pipeliner.class)) {
- log.warn("Pipeline behaviour not supported for device {}",
- deviceId);
- return;
- }
- } catch (ItemNotFoundException e) {
- log.warn("No applicable driver for device {}", deviceId);
+ // Attempt to lookup the handler in the cache
+ DriverHandler handler = driverHandlers.get(deviceId);
+ if (handler == null) {
+ try {
+ // Otherwise create it and if it has pipeline behaviour, cache it
+ handler = driverService.createHandler(deviceId);
+ if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+ log.warn("Pipeline behaviour not supported for device {}",
+ deviceId);
return;
}
- driverHandlers.put(deviceId, handler);
+ } catch (ItemNotFoundException e) {
+ log.warn("No applicable driver for device {}", deviceId);
+ return;
}
- // Always (re)initialize the pipeline behaviour
- log.info("Driver {} bound to device {} ... initializing driver",
- handler.driver().name(), deviceId);
- Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
- pipeliner.init(deviceId, context);
- pipeliners.putIfAbsent(deviceId, pipeliner);
+ driverHandlers.put(deviceId, handler);
}
+
+ // Always (re)initialize the pipeline behaviour
+ log.info("Driver {} bound to device {} ... initializing driver",
+ handler.driver().name(), deviceId);
+ Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+ pipeliner.init(deviceId, context);
+ pipeliners.putIfAbsent(deviceId, pipeliner);
}
// Triggers driver setup when the local node becomes a device master.
@@ -240,10 +261,8 @@
public void event(MastershipEvent event) {
switch (event.type()) {
case MASTER_CHANGED:
- if (event.roleInfo().master() != null) {
- setupPipelineHandler(event.subject());
- log.info("mastership changed on device {}", event.subject());
- }
+ log.info("mastership changed on device {}", event.subject());
+ setupPipelineHandler(event.subject());
break;
case BACKUPS_CHANGED:
break;
@@ -259,13 +278,14 @@
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
+ setupPipelineHandler(event.subject().id());
+ break;
case DEVICE_AVAILABILITY_CHANGED:
log.info("Device either added or availability changed {}",
event.subject().id());
if (deviceService.isAvailable(event.subject().id())) {
log.info("Device is now available {}", event.subject().id());
setupPipelineHandler(event.subject().id());
- processPendingObjectives(event.subject().id());
}
break;
case DEVICE_UPDATED:
@@ -284,22 +304,6 @@
break;
}
}
-
- private void processPendingObjectives(DeviceId deviceId) {
- log.debug("Processing pending objectives for device {}", deviceId);
-
- pendingObjectives.getOrDefault(deviceId,
- Collections.emptySet()).forEach(obj -> {
- if (obj instanceof NextObjective) {
- next(deviceId, (NextObjective) obj);
- } else if (obj instanceof ForwardingObjective) {
- forward(deviceId, (ForwardingObjective) obj);
- } else {
- getDevicePipeliner(deviceId)
- .filter((FilteringObjective) obj);
- }
- });
- }
}
// Processing context for initializing pipeline driver behaviours.
@@ -313,8 +317,6 @@
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
-
-
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@@ -356,7 +358,5 @@
public ForwardingObjective forwardingObjective() {
return fwd;
}
-
-
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 9dd0634..a64b678 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -27,12 +27,17 @@
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.packet.DefaultPacketRequest;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
@@ -73,6 +78,9 @@
private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private FlowObjectiveService objectiveService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -169,12 +177,28 @@
return;
}
- TrafficTreatment treatment = DefaultTrafficTreatment.builder().punt().build();
- FlowRule flow = new DefaultFlowRule(device.id(), request.selector(), treatment,
- request.priority().priorityValue(),
- appId, 0, true, request.tableType());
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .punt()
+ .build();
- flowService.applyFlowRules(flow);
+ ForwardingObjective forwarding = DefaultForwardingObjective.builder()
+ .withPriority(request.priority().priorityValue())
+ .withSelector(request.selector())
+ .fromApp(appId)
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withTreatment(treatment)
+ .makePermanent()
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) { }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.warn("Failed to install packet request flow: {}", error);
+ }
+ });
+
+ objectiveService.forward(device.id(), forwarding);
}
@Override
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
index 580691b..bfa9c32 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
@@ -16,32 +16,31 @@
package org.onosproject.driver.pipeline;
import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.net.driver.DriverAdminService;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.driver.DefaultDriverProviderService;
+import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverProvider;
import org.onosproject.net.driver.XmlDriverLoader;
-import org.apache.felix.scr.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Set;
/**
* Bootstrap for built in drivers.
*/
-@Component(immediate = true)
-public class DefaultDrivers {
+@Service
+@Component(immediate = false)
+public class DefaultDrivers implements DefaultDriverProviderService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String DRIVERS_XML = "/onos-drivers.xml";
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DriverAdminService driverService;
-
private DriverProvider provider;
@Activate
@@ -50,7 +49,6 @@
try {
InputStream stream = classLoader.getResourceAsStream(DRIVERS_XML);
provider = new XmlDriverLoader(classLoader).loadDrivers(stream);
- driverService.registerProvider(provider);
} catch (IOException e) {
log.error("Unable to load default drivers", e);
}
@@ -59,10 +57,11 @@
@Deactivate
protected void deactivate() {
- if (provider != null) {
- driverService.unregisterProvider(provider);
- }
log.info("Stopped");
}
+ @Override
+ public Set<Driver> getDrivers() {
+ return provider.getDrivers();
+ }
}