blob: eac2116a5716d83ced4e001e9d94a815bcc5a465 [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.pipelines.fabric.impl.behaviour.pipeliner;
import com.google.common.collect.ImmutableList;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SharedExecutors;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.IdNextTreatment;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.NextTreatment;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupService;
import org.onosproject.pipelines.fabric.impl.behaviour.AbstractFabricHandlerBehavior;
import org.onosproject.pipelines.fabric.impl.behaviour.FabricCapabilities;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.onosproject.pipelines.fabric.impl.behaviour.FabricUtils.outputPort;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Pipeliner implementation for fabric pipeline which uses ObjectiveTranslator
* implementations to translate flow objectives for the different blocks,
* filtering, forwarding and next.
*/
public class FabricPipeliner extends AbstractFabricHandlerBehavior
implements Pipeliner {
private static final Logger log = getLogger(FabricPipeliner.class);
protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(FabricNextGroup.class)
.build("FabricPipeliner");
protected DeviceId deviceId;
protected FlowRuleService flowRuleService;
protected GroupService groupService;
protected FlowObjectiveStore flowObjectiveStore;
private FilteringObjectiveTranslator filteringTranslator;
private ForwardingObjectiveTranslator forwardingTranslator;
private NextObjectiveTranslator nextTranslator;
private final ExecutorService callbackExecutor = SharedExecutors.getPoolThreadExecutor();
/**
* Creates a new instance of this behavior with the given capabilities.
*
* @param capabilities capabilities
*/
public FabricPipeliner(FabricCapabilities capabilities) {
super(capabilities);
}
/**
* Create a new instance of this behaviour. Used by the abstract projectable
* model (i.e., {@link org.onosproject.net.Device#as(Class)}.
*/
public FabricPipeliner() {
super();
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.deviceId = deviceId;
this.flowRuleService = context.directory().get(FlowRuleService.class);
this.groupService = context.directory().get(GroupService.class);
this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
this.filteringTranslator = new FilteringObjectiveTranslator(deviceId, capabilities);
this.forwardingTranslator = new ForwardingObjectiveTranslator(deviceId, capabilities);
this.nextTranslator = new NextObjectiveTranslator(deviceId, capabilities);
}
@Override
public void filter(FilteringObjective obj) {
final ObjectiveTranslation result = filteringTranslator.translate(obj);
handleResult(obj, result);
}
@Override
public void forward(ForwardingObjective obj) {
final ObjectiveTranslation result = forwardingTranslator.translate(obj);
handleResult(obj, result);
}
@Override
public void next(NextObjective obj) {
if (obj.op() == Objective.Operation.VERIFY) {
// TODO: support VERIFY operation
log.debug("VERIFY operation not yet supported for NextObjective, will return success");
success(obj);
return;
}
if (obj.op() == Objective.Operation.MODIFY) {
// TODO: support MODIFY operation
log.warn("MODIFY operation not yet supported for NextObjective, will return failure :(");
fail(obj, ObjectiveError.UNSUPPORTED);
return;
}
final ObjectiveTranslation result = nextTranslator.translate(obj);
handleResult(obj, result);
}
@Override
public List<String> getNextMappings(NextGroup nextGroup) {
final FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
return fabricNextGroup.nextMappings().stream()
.map(m -> format("%s -> %s", fabricNextGroup.type(), m))
.collect(Collectors.toList());
}
private void handleResult(Objective obj, ObjectiveTranslation result) {
if (result.error().isPresent()) {
fail(obj, result.error().get());
return;
}
processGroups(obj, result.groups());
processFlows(obj, result.flowRules());
if (obj instanceof NextObjective) {
handleNextGroup((NextObjective) obj);
}
success(obj);
}
private void handleNextGroup(NextObjective obj) {
switch (obj.op()) {
case REMOVE:
removeNextGroup(obj);
break;
case ADD:
case ADD_TO_EXISTING:
case REMOVE_FROM_EXISTING:
case MODIFY:
putNextGroup(obj);
break;
case VERIFY:
break;
default:
log.error("Unknown NextObjective operation '{}'", obj.op());
}
}
private void processFlows(Objective objective, Collection<FlowRule> flowRules) {
if (flowRules.isEmpty()) {
return;
}
final FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
switch (objective.op()) {
case ADD:
case ADD_TO_EXISTING:
flowRules.forEach(ops::add);
break;
case REMOVE:
case REMOVE_FROM_EXISTING:
flowRules.forEach(ops::remove);
break;
default:
log.warn("Unsupported Objective operation '{}'", objective.op());
return;
}
flowRuleService.apply(ops.build());
}
private void processGroups(Objective objective, Collection<GroupDescription> groups) {
if (groups.isEmpty()) {
return;
}
switch (objective.op()) {
case ADD:
groups.forEach(groupService::addGroup);
break;
case REMOVE:
groups.forEach(group -> groupService.removeGroup(
deviceId, group.appCookie(), objective.appId()));
break;
case ADD_TO_EXISTING:
groups.forEach(group -> groupService.addBucketsToGroup(
deviceId, group.appCookie(), group.buckets(),
group.appCookie(), group.appId())
);
break;
case REMOVE_FROM_EXISTING:
groups.forEach(group -> groupService.removeBucketsFromGroup(
deviceId, group.appCookie(), group.buckets(),
group.appCookie(), group.appId())
);
break;
default:
log.warn("Unsupported Objective operation {}", objective.op());
}
}
private void fail(Objective objective, ObjectiveError error) {
CompletableFuture.runAsync(
() -> objective.context().ifPresent(
ctx -> ctx.onError(objective, error)), callbackExecutor);
}
private void success(Objective objective) {
CompletableFuture.runAsync(
() -> objective.context().ifPresent(
ctx -> ctx.onSuccess(objective)), callbackExecutor);
}
private void removeNextGroup(NextObjective obj) {
final NextGroup removed = flowObjectiveStore.removeNextGroup(obj.id());
if (removed == null) {
log.debug("NextGroup {} was not found in FlowObjectiveStore");
}
}
private void putNextGroup(NextObjective obj) {
final List<String> nextMappings = obj.nextTreatments().stream()
.map(this::nextTreatmentToMappingString)
.filter(Objects::nonNull)
.collect(Collectors.toList());
final FabricNextGroup nextGroup = new FabricNextGroup(obj.type(), nextMappings);
flowObjectiveStore.putNextGroup(obj.id(), nextGroup);
}
private String nextTreatmentToMappingString(NextTreatment n) {
switch (n.type()) {
case TREATMENT:
final PortNumber p = outputPort(n);
return p == null ? "UNKNOWN"
: format("OUTPUT:%s", p.toString());
case ID:
final IdNextTreatment id = (IdNextTreatment) n;
return format("NEXT_ID:%d", id.nextId());
default:
log.warn("Unknown NextTreatment type '{}'", n.type());
return "???";
}
}
/**
* NextGroup implementation.
*/
private static class FabricNextGroup implements NextGroup {
private final NextObjective.Type type;
private final List<String> nextMappings;
FabricNextGroup(NextObjective.Type type, List<String> nextMappings) {
this.type = type;
this.nextMappings = ImmutableList.copyOf(nextMappings);
}
NextObjective.Type type() {
return type;
}
Collection<String> nextMappings() {
return nextMappings;
}
@Override
public byte[] data() {
return KRYO.serialize(this);
}
}
}