blob: 4f8404ba26a8189c1d772173167df38bd427bfe7 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalListeners;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import org.onlab.util.Tools;
import org.onlab.util.Tools.LogLevel;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flowobjective.FilteringObjQueueKey;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjQueueKey;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.flowobjective.ObjectiveQueueKey;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true, service = FlowObjectiveService.class)
public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
private final Logger log = LoggerFactory.getLogger(getClass());
// TODO Make queue timeout configurable
static final int DEFAULT_OBJ_TIMEOUT = 15000;
int objTimeoutMs = DEFAULT_OBJ_TIMEOUT;
private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
private ScheduledExecutorService cacheCleaner;
private ExecutorService filtCacheEventExecutor;
private ExecutorService fwdCacheEventExecutor;
private ExecutorService nextCacheEventExecutor;
private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
@Activate
protected void activate() {
super.activate();
filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
Objective obj = notification.getValue();
switch (notification.getCause()) {
case EXPIRED:
case COLLECTED:
case SIZE:
obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
break;
case EXPLICIT: // No action when the objective completes correctly
case REPLACED: // No action when a pending forward or next objective gets executed
default:
break;
}
};
filtObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
.build();
fwdObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
.build();
nextObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
.build();
cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
cacheCleaner.scheduleAtFixedRate(() -> {
filtObjQueueHead.cleanUp();
fwdObjQueueHead.cleanUp();
nextObjQueueHead.cleanUp();
}, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
// Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
// execute()
flowObjectiveStore.unsetDelegate(super.delegate);
flowObjectiveStore.setDelegate(delegate);
}
@Deactivate
protected void deactivate() {
cacheCleaner.shutdown();
clearQueue();
filtCacheEventExecutor.shutdown();
fwdCacheEventExecutor.shutdown();
nextCacheEventExecutor.shutdown();
super.deactivate();
}
/**
* Processes given objective on given device.
* Objectives submitted through this method are guaranteed to be executed in order.
*
* @param deviceId Device ID
* @param originalObjective Flow objective to be executed
*/
private void process(DeviceId deviceId, Objective originalObjective) {
// Inject ObjectiveContext such that we can get notified when it is completed
Objective.Builder objBuilder = originalObjective.copy();
Optional<ObjectiveContext> originalContext = originalObjective.context();
ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
// Preserve Objective.Operation
Objective objective;
switch (originalObjective.op()) {
case ADD:
objective = objBuilder.add(context);
break;
case ADD_TO_EXISTING:
objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
break;
case REMOVE:
objective = objBuilder.remove(context);
break;
case REMOVE_FROM_EXISTING:
objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
break;
case MODIFY:
objective = ((NextObjective.Builder) objBuilder).modify(context);
break;
case VERIFY:
objective = ((NextObjective.Builder) objBuilder).verify(context);
break;
default:
log.error("Unknown flow objecitve operation {}", originalObjective.op());
return;
}
enqueue(deviceId, objective);
}
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
process(deviceId, filteringObjective);
}
@Override
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
process(deviceId, forwardingObjective);
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
process(deviceId, nextObjective);
}
@Override
public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
return filtObjQueue;
}
@Override
public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
return fwdObjQueue;
}
@Override
public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
return nextObjQueue;
}
@Override
public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
return filtObjQueueHead.asMap();
}
@Override
public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
return fwdObjQueueHead.asMap();
}
@Override
public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
return nextObjQueueHead.asMap();
}
@Override
public void clearQueue() {
filtObjQueueHead.invalidateAll();
fwdObjQueueHead.invalidateAll();
nextObjQueueHead.invalidateAll();
filtObjQueueHead.cleanUp();
fwdObjQueueHead.cleanUp();
nextObjQueueHead.cleanUp();
filtObjQueue.clear();
fwdObjQueue.clear();
nextObjQueue.clear();
}
/**
* Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
*
* @param deviceId Device ID
* @param obj Flow objective
*/
private synchronized void enqueue(DeviceId deviceId, Objective obj) {
int queueSize;
int priority = obj.priority();
LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
Tools.log(log, logLevel, "Enqueue {}", obj);
if (obj instanceof FilteringObjective) {
FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
filtObjQueue.put(k, obj);
queueSize = filtObjQueue.get(k).size();
} else if (obj instanceof ForwardingObjective) {
ForwardingObjQueueKey k =
new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
fwdObjQueue.put(k, obj);
queueSize = fwdObjQueue.get(k).size();
} else if (obj instanceof NextObjective) {
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
nextObjQueue.put(k, obj);
queueSize = nextObjQueue.get(k).size();
} else {
log.error("Unknown flow objective instance: {}", obj.getClass().getName());
return;
}
log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
// Execute immediately if there is no pending obj ahead
if (queueSize == 1) {
execute(deviceId, obj);
}
}
/**
* Dequeue flow objective. Execute the next flow objective in the queue, if any.
*
* @param deviceId Device ID
* @param obj Flow objective
* @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
*/
private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
List<Objective> remaining;
int priority = obj.priority();
LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
Tools.log(log, logLevel, "Dequeue {}", obj);
if (obj instanceof FilteringObjective) {
FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
filtObjQueueHead.invalidate(k);
}
filtObjQueue.remove(k, obj);
remaining = filtObjQueue.get(k);
} else if (obj instanceof ForwardingObjective) {
ForwardingObjQueueKey k =
new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
fwdObjQueueHead.invalidate(k);
}
fwdObjQueue.remove(k, obj);
remaining = fwdObjQueue.get(k);
} else if (obj instanceof NextObjective) {
if (error != null) {
// Remove pendingForwards and pendingNexts if next objective failed
Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
if (removedForwards != null) {
removedForwards.stream().map(PendingFlowObjective::flowObjective)
.forEach(pendingObj -> pendingObj.context().ifPresent(c ->
c.onError(pendingObj, error)));
}
if (removedNexts != null) {
removedNexts.stream().map(PendingFlowObjective::flowObjective)
.forEach(pendingObj -> pendingObj.context().ifPresent(c ->
c.onError(pendingObj, error)));
}
}
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
nextObjQueueHead.invalidate(k);
}
nextObjQueue.remove(k, obj);
remaining = nextObjQueue.get(k);
} else {
log.error("Unknown flow objective instance: {}", obj.getClass().getName());
return;
}
log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
// Submit the next one in the queue, if any
if (remaining.size() > 0) {
execute(deviceId, remaining.get(0));
}
}
/**
* Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
* Therefore we must be certain that this method is called in-order.
*
* @param deviceId Device ID
* @param obj Flow objective
*/
private void execute(DeviceId deviceId, Objective obj) {
LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
int priority = obj.priority();
if (obj instanceof FilteringObjective) {
FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
filtObjQueueHead.put(k, obj);
super.filter(deviceId, (FilteringObjective) obj);
} else if (obj instanceof ForwardingObjective) {
ForwardingObjQueueKey k =
new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
fwdObjQueueHead.put(k, obj);
super.forward(deviceId, (ForwardingObjective) obj);
} else if (obj instanceof NextObjective) {
NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
nextObjQueueHead.put(k, obj);
super.next(deviceId, (NextObjective) obj);
} else {
log.error("Unknown flow objective instance: {}", obj.getClass().getName());
}
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@Override
public void notify(ObjectiveEvent event) {
if (event.type() == ObjectiveEvent.Type.ADD) {
log.debug("Received notification of obj event {}", event);
Set<PendingFlowObjective> pending;
// first send all pending flows
synchronized (pendingForwards) {
// needs to be synchronized for queueObjective lookup
pending = pendingForwards.remove(event.subject());
}
if (pending == null) {
log.debug("No forwarding objectives pending for this obj event {}", event);
} else {
log.debug("Processing {} pending forwarding objectives for nextId {}",
pending.size(), event.subject());
// execute pending forwards one by one
pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
// now check for pending next-objectives
// Note: This is still necessary despite the existence of in-order execution.
// Since the in-order execution does not handle the case of
// ADD_TO_EXISTING coming before ADD
List<PendingFlowObjective> pendNexts;
synchronized (pendingNexts) {
// needs to be synchronized for queueObjective lookup
pendNexts = pendingNexts.remove(event.subject());
}
if (pendNexts == null) {
log.debug("No next objectives pending for this obj event {}", event);
} else {
log.debug("Processing {} pending next objectives for nextId {}",
pendNexts.size(), event.subject());
// execute pending nexts one by one
pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
}
}
}
}
final class InOrderObjectiveContext implements ObjectiveContext {
private final DeviceId deviceId;
private final ObjectiveContext originalContext;
// Prevent context from being executed multiple times.
// E.g. when the context actually succeed after the cache timeout
private final AtomicBoolean done;
InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
this.deviceId = deviceId;
this.originalContext = originalContext;
this.done = new AtomicBoolean(false);
}
@Override
public void onSuccess(Objective objective) {
log.trace("Flow objective onSuccess {}", objective);
if (!done.getAndSet(true)) {
dequeue(deviceId, objective, null);
if (originalContext != null) {
originalContext.onSuccess(objective);
}
}
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.warn("Flow objective onError {}. Reason = {}", objective, error);
if (!done.getAndSet(true)) {
dequeue(deviceId, objective, error);
if (originalContext != null) {
originalContext.onError(objective, error);
}
}
}
}
}