Implements in-order flow objective execution
Change-Id: I6d11e25bc44c07ef8488e51b63c3ff8a88d98f9c
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index fdba591..20e55a5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -80,12 +80,12 @@
/**
* Provides implementation of the flow objective programming service.
*/
-@Component(immediate = true)
+@Component(enabled = false)
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
- public static final int INSTALL_RETRY_ATTEMPTS = 5;
- public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
+ private static final int INSTALL_RETRY_ATTEMPTS = 5;
+ private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
private static final String WORKER_PATTERN = "objective-installer-%d";
private static final String GROUP_THREAD_NAME = "onos/objective-installer";
@@ -111,9 +111,11 @@
// Note: The following dependencies are added on behalf of the pipeline
// driver behaviours to assure these services are available for their
// initialization.
+ @SuppressWarnings("unused")
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
+ @SuppressWarnings("unused")
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
@@ -123,31 +125,30 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
- private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
+ final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
- private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
+ protected final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
private final PipelinerContext context = new InnerPipelineContext();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final DriverListener driverListener = new InnerDriverListener();
- protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
+ private ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
// local stores for queuing fwd and next objectives that are waiting for an
// associated next objective execution to complete. The signal for completed
// execution comes from a pipeline driver, in this or another controller
// instance, via the DistributedFlowObjectiveStore.
- private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
- Maps.newConcurrentMap();
- private final Map<Integer, List<PendingFlowObjective>> pendingNexts =
- Maps.newConcurrentMap();
+ // TODO Making these cache and timeout the entries
+ final Map<Integer, Set<PendingFlowObjective>> pendingForwards = Maps.newConcurrentMap();
+ final Map<Integer, List<PendingFlowObjective>> pendingNexts = Maps.newConcurrentMap();
// local store to track which nextObjectives were sent to which device
// for debugging purposes
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
- private ExecutorService executorService;
+ ExecutorService executorService;
@Activate
protected void activate() {
@@ -196,17 +197,17 @@
* make a few attempts to find the appropriate driver, then eventually give
* up and report an error if no suitable driver could be found.
*/
- private class ObjectiveInstaller implements Runnable {
- private final DeviceId deviceId;
- private final Objective objective;
+ class ObjectiveInstaller implements Runnable {
+ final DeviceId deviceId;
+ final Objective objective;
private final int numAttempts;
- public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+ ObjectiveInstaller(DeviceId deviceId, Objective objective) {
this(deviceId, objective, 1);
}
- public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+ ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
this.deviceId = checkNotNull(deviceId);
this.objective = checkNotNull(objective);
this.numAttempts = attemps;
@@ -281,7 +282,7 @@
public void initPolicy(String policy) {
}
- private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
+ boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
boolean queued = false;
synchronized (pendingForwards) {
// double check the flow objective store, because this block could run
@@ -306,8 +307,7 @@
return queued;
}
- private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
-
+ boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
// we need to hold off on other operations till we get notified that the
// initial group creation has succeeded
boolean queued = false;
@@ -578,11 +578,11 @@
* removeFromExisting) waiting for a next objective with the same id to
* complete execution.
*/
- private class PendingFlowObjective {
+ protected class PendingFlowObjective {
private final DeviceId deviceId;
private final Objective flowObj;
- public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
+ PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
this.deviceId = deviceId;
this.flowObj = flowObj;
}
@@ -609,11 +609,9 @@
return false;
}
final PendingFlowObjective other = (PendingFlowObjective) obj;
- if (this.deviceId.equals(other.deviceId) &&
- this.flowObj.equals(other.flowObj)) {
- return true;
- }
- return false;
+
+ return this.deviceId.equals(other.deviceId) &&
+ this.flowObj.equals(other.flowObj);
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
new file mode 100644
index 0000000..f6e6dd8
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flowobjective.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+@Component(immediate = true, enabled = true)
+@Service
+public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // TODO Making these cache and timeout the entries
+ private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
+ Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+ private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
+ Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+ private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
+ Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+
+ final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
+
+ @Activate
+ protected void activate() {
+ super.activate();
+ // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
+ // process()
+ flowObjectiveStore.unsetDelegate(super.delegate);
+ flowObjectiveStore.setDelegate(delegate);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ super.deactivate();
+ }
+
+ /**
+ * Processes given objective on given device.
+ * Objectives submitted through this method are guaranteed to be executed in order.
+ *
+ * @param deviceId Device ID
+ * @param originalObjective Flow objective to be executed
+ */
+ private void process(DeviceId deviceId, Objective originalObjective) {
+ // Inject ObjectiveContext such that we can get notified when it is completed
+ Objective.Builder objBuilder = originalObjective.copy();
+ Optional<ObjectiveContext> originalContext = originalObjective.context();
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.trace("Flow objective onSuccess {}", objective);
+ dequeue(deviceId, objective);
+ originalContext.ifPresent(c -> c.onSuccess(objective));
+ }
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.trace("Flow objective onError {}", objective);
+ dequeue(deviceId, objective);
+ originalContext.ifPresent(c -> c.onError(objective, error));
+ }
+ };
+
+ // Preserve Objective.Operation
+ Objective objective;
+ switch (originalObjective.op()) {
+ case ADD:
+ objective = objBuilder.add(context);
+ break;
+ case ADD_TO_EXISTING:
+ objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
+ break;
+ case REMOVE:
+ objective = objBuilder.remove(context);
+ break;
+ case REMOVE_FROM_EXISTING:
+ objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
+ break;
+ case MODIFY:
+ objective = ((NextObjective.Builder) objBuilder).modify(context);
+ break;
+ case VERIFY:
+ objective = ((NextObjective.Builder) objBuilder).verify(context);
+ break;
+ default:
+ log.error("Unknown flow objecitve operation {}", originalObjective.op());
+ return;
+ }
+
+ enqueue(deviceId, objective);
+ }
+
+ @Override
+ public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+ process(deviceId, filteringObjective);
+ }
+
+ @Override
+ public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+ process(deviceId, forwardingObjective);
+ }
+
+ @Override
+ public void next(DeviceId deviceId, NextObjective nextObjective) {
+ process(deviceId, nextObjective);
+ }
+
+ /**
+ * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
+ *
+ * @param deviceId Device ID
+ * @param obj Flow objective
+ */
+ private synchronized void enqueue(DeviceId deviceId, Objective obj) {
+ int queueSize;
+ int priority = obj.priority();
+
+ if (obj instanceof FilteringObjective) {
+ log.debug("Enqueue filtering objective {}", obj);
+ FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueue.put(k, obj);
+ queueSize = filtObjQueue.get(k).size();
+ log.debug("Filtering objective queue size {}", queueSize);
+ } else if (obj instanceof ForwardingObjective) {
+ log.debug("Enqueue forwarding objective {}", obj);
+ FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueue.put(k, obj);
+ queueSize = fwdObjQueue.get(k).size();
+ log.debug("Forwarding objective queue size {}", queueSize);
+ } else if (obj instanceof NextObjective) {
+ log.debug("Enqueue next objective {}", obj);
+ NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueue.put(k, obj);
+ queueSize = nextObjQueue.get(k).size();
+ log.debug("Next objective queue size {}", queueSize);
+ } else {
+ log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+ return;
+ }
+
+ // Execute immediately if there is no pending obj ahead
+ if (queueSize == 1) {
+ log.debug("First one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+ execute(deviceId, obj);
+ }
+ }
+
+ /**
+ * Dequeue flow objective. Execute the next flow objective in the queue, if any.
+ *
+ * @param deviceId Device ID
+ * @param obj Flow objective
+ */
+ private synchronized void dequeue(DeviceId deviceId, Objective obj) {
+ List<Objective> remaining;
+ int priority = obj.priority();
+
+ if (obj instanceof FilteringObjective) {
+ log.debug("Dequeue filtering objective {}", obj);
+ FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+ filtObjQueue.remove(k, obj);
+ remaining = filtObjQueue.get(k);
+ log.debug("Filtering objective queue size {}", remaining.size());
+ } else if (obj instanceof ForwardingObjective) {
+ log.debug("Dequeue forwarding objective {}", obj);
+ FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+ fwdObjQueue.remove(k, obj);
+ remaining = fwdObjQueue.get(k);
+ log.debug("Forwarding objective queue size {}", remaining.size());
+ } else if (obj instanceof NextObjective) {
+ log.debug("Dequeue next objective {}", obj);
+ NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+ nextObjQueue.remove(k, obj);
+ remaining = nextObjQueue.get(k);
+ log.debug("Next objective queue size {}", remaining.size());
+ } else {
+ log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+ return;
+ }
+
+ // Submit the next one in the queue, if any
+ if (remaining.size() > 0) {
+ log.debug("Next one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+ execute(deviceId, remaining.get(0));
+ }
+ }
+
+ /**
+ * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
+ * Therefore we must be certain that this method is called in-order.
+ *
+ * @param deviceId Device ID
+ * @param obj Flow objective
+ */
+ private void execute(DeviceId deviceId, Objective obj) {
+ if (obj instanceof FilteringObjective) {
+ super.filter(deviceId, (FilteringObjective) obj);
+ } else if (obj instanceof ForwardingObjective) {
+ super.forward(deviceId, (ForwardingObjective) obj);
+ } else if (obj instanceof NextObjective) {
+ super.next(deviceId, (NextObjective) obj);
+ } else {
+ log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+ }
+ }
+
+ private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+ @Override
+ public void notify(ObjectiveEvent event) {
+ if (event.type() == ObjectiveEvent.Type.ADD) {
+ log.debug("Received notification of obj event {}", event);
+ Set<PendingFlowObjective> pending;
+
+ // first send all pending flows
+ synchronized (pendingForwards) {
+ // needs to be synchronized for queueObjective lookup
+ pending = pendingForwards.remove(event.subject());
+ }
+ if (pending == null) {
+ log.debug("No forwarding objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending forwarding objectives for nextId {}",
+ pending.size(), event.subject());
+ // resubmitted back to the execution queue
+ pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
+ }
+
+ // now check for pending next-objectives
+ // Note: This is still necessary despite the existence of in-order execution.
+ // Since the in-order execution does not handle the case of
+ // ADD_TO_EXISTING coming before ADD
+ List<PendingFlowObjective> pendNexts;
+ synchronized (pendingNexts) {
+ // needs to be synchronized for queueObjective lookup
+ pendNexts = pendingNexts.remove(event.subject());
+ }
+ if (pendNexts == null) {
+ log.debug("No next objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending next objectives for nextId {}",
+ pendNexts.size(), event.subject());
+ // resubmitted back to the execution queue
+ pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
+ }
+ }
+ }
+ }
+
+ private static class FiltObjQueueKey {
+ private DeviceId deviceId;
+ private int priority;
+ private Criterion key;
+
+ FiltObjQueueKey(DeviceId deviceId, int priority, Criterion key) {
+ this.deviceId = deviceId;
+ this.priority = priority;
+ this.key = key;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, priority, key);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FiltObjQueueKey)) {
+ return false;
+ }
+ FiltObjQueueKey that = (FiltObjQueueKey) other;
+ return Objects.equals(this.deviceId, that.deviceId) &&
+ Objects.equals(this.priority, that.priority) &&
+ Objects.equals(this.key, that.key);
+ }
+ }
+
+ private static class FwdObjQueueKey {
+ private DeviceId deviceId;
+ private int priority;
+ private TrafficSelector selector;
+
+ FwdObjQueueKey(DeviceId deviceId, int priority, TrafficSelector selector) {
+ this.deviceId = deviceId;
+ this.priority = priority;
+ this.selector = selector;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, priority, selector);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FwdObjQueueKey)) {
+ return false;
+ }
+ FwdObjQueueKey that = (FwdObjQueueKey) other;
+ return Objects.equals(this.deviceId, that.deviceId) &&
+ Objects.equals(this.priority, that.priority) &&
+ Objects.equals(this.selector, that.selector);
+ }
+ }
+
+ private static class NextObjQueueKey {
+ private DeviceId deviceId;
+ private int id;
+
+ NextObjQueueKey(DeviceId deviceId, int id) {
+ this.deviceId = deviceId;
+ this.id = id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, id);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof NextObjQueueKey)) {
+ return false;
+ }
+ NextObjQueueKey that = (NextObjQueueKey) other;
+ return Objects.equals(this.deviceId, that.deviceId) &&
+ Objects.equals(this.id, that.id);
+ }
+ }
+}