[ONOS-7129] Pipeliner for fabric pipeline
Change-Id: I86b44694e1251611359e8ddc8be2533a741230cc
diff --git a/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
new file mode 100644
index 0000000..e2d8048
--- /dev/null
+++ b/pipelines/fabric/src/main/java/org/onosproject/pipelines/fabric/pipeliner/FabricPipeliner.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.pipeliner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Pipeliner for fabric pipeline.
+ */
+public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
+ private static final Logger log = getLogger(FabricPipeliner.class);
+
+ protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(FabricNextGroup.class)
+ .build("FabricPipeliner");
+
+ // TODO: make this configurable
+ private static final long DEFAULT_INSTALLATION_TIME_OUT = 10;
+
+ protected DeviceId deviceId;
+ protected FlowRuleService flowRuleService;
+ protected GroupService groupService;
+ protected FlowObjectiveStore flowObjectiveStore;
+ protected FabricFilteringPipeliner pipelinerFilter;
+ protected FabricForwardingPipeliner pipelinerForward;
+ protected FabricNextPipeliner pipelinerNext;
+
+
+ @Override
+ public void init(DeviceId deviceId, PipelinerContext context) {
+ this.deviceId = deviceId;
+ this.flowRuleService = context.directory().get(FlowRuleService.class);
+ this.groupService = context.directory().get(GroupService.class);
+ this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
+ this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
+ this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
+ this.pipelinerNext = new FabricNextPipeliner(deviceId);
+ }
+
+ @Override
+ public void filter(FilteringObjective filterObjective) {
+ PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
+ if (result.error().isPresent()) {
+ fail(filterObjective, result.error().get());
+ return;
+ }
+
+ applyTranslationResult(filterObjective, result, success -> {
+ if (success) {
+ success(filterObjective);
+ } else {
+ fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
+ });
+ }
+
+ @Override
+ public void forward(ForwardingObjective forwardObjective) {
+ PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
+ if (result.error().isPresent()) {
+ fail(forwardObjective, result.error().get());
+ return;
+ }
+
+ applyTranslationResult(forwardObjective, result, success -> {
+ if (success) {
+ success(forwardObjective);
+ } else {
+ fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
+ });
+ }
+
+ @Override
+ public void next(NextObjective nextObjective) {
+ PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
+
+ if (result.error().isPresent()) {
+ fail(nextObjective, result.error().get());
+ return;
+ }
+
+ applyTranslationResult(nextObjective, result, success -> {
+ if (!success) {
+ fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
+ return;
+ }
+
+ // Success, put next group to objective store
+ List<PortNumber> portNumbers = Lists.newArrayList();
+ nextObjective.next().forEach(treatment -> {
+ Instructions.OutputInstruction outputInst = treatment.allInstructions()
+ .stream()
+ .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
+ .map(inst -> (Instructions.OutputInstruction) inst)
+ .findFirst()
+ .orElse(null);
+
+ if (outputInst != null) {
+ portNumbers.add(outputInst.port());
+ }
+ });
+ FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
+ portNumbers);
+ flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
+ success(nextObjective);
+ });
+ }
+
+ @Override
+ public List<String> getNextMappings(NextGroup nextGroup) {
+ return null;
+ }
+
+ private void applyTranslationResult(Objective objective,
+ PipelinerTranslationResult result,
+ Consumer<Boolean> callback) {
+ Collection<GroupDescription> groups = result.groups();
+ Collection<FlowRule> flowRules = result.flowRules();
+ CompletableFuture.supplyAsync(() -> installGroups(objective, groups))
+ .thenApplyAsync(groupSuccess -> groupSuccess && installFlows(objective, flowRules))
+ .thenAcceptAsync(callback)
+ .exceptionally((ex) -> {
+ log.warn("Got unexpected exception while applying translation result {}",
+ result);
+ fail(objective, ObjectiveError.UNKNOWN);
+ return null;
+ });
+ }
+
+ private boolean installFlows(Objective objective, Collection<FlowRule> flowRules) {
+ if (flowRules.isEmpty()) {
+ return true;
+ }
+ CompletableFuture<Boolean> flowInstallFuture = new CompletableFuture<>();
+ FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ flowInstallFuture.complete(true);
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ log.warn("Failed to install flow rules: {}", flowRules);
+ flowInstallFuture.complete(false);
+ }
+ };
+
+ FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
+ flowRuleService.apply(ops);
+
+ try {
+ return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Got exception while installing groups: {}", e);
+ return false;
+ }
+ }
+
+ private boolean installGroups(Objective objective, Collection<GroupDescription> groups) {
+ if (groups.isEmpty()) {
+ return true;
+ }
+ int numGroupsToBeInstalled = groups.size();
+ CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
+ AtomicInteger numGroupsInstalled = new AtomicInteger(0);
+ GroupListener listener = new GroupListener() {
+ @Override
+ public void event(GroupEvent event) {
+ int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
+ if (currentNumGroupInstalled == numGroupsToBeInstalled) {
+ // install completed
+ groupService.removeListener(this);
+ groupInstallFuture.complete(true);
+ }
+ }
+ @Override
+ public boolean isRelevant(GroupEvent event) {
+ return groups.contains(event.subject());
+ }
+ };
+ groupService.addListener(listener);
+
+ switch (objective.op()) {
+ case ADD:
+ groups.forEach(groupService::addGroup);
+ break;
+ case REMOVE:
+ groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
+ break;
+ default:
+ log.warn("Unsupported objective operation {}", objective.op());
+ groupService.removeListener(listener);
+ }
+ try {
+ return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Got exception while installing groups: {}", e);
+ return false;
+ }
+ }
+
+ static void fail(Objective objective, ObjectiveError error) {
+ objective.context().ifPresent(ctx -> ctx.onError(objective, error));
+ }
+
+ static void success(Objective objective) {
+ objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
+ }
+
+ static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
+ FlowRuleOperationsContext ctx) {
+ FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+ switch (objective.op()) {
+ case ADD:
+ flowRules.forEach(ops::add);
+ break;
+ case REMOVE:
+ flowRules.forEach(ops::remove);
+ break;
+ default:
+ log.warn("Unsupported op {} for {}", objective);
+ fail(objective, ObjectiveError.BADPARAMS);
+ return null;
+ }
+ return ops.build(ctx);
+ }
+
+ class FabricNextGroup implements NextGroup {
+ private NextObjective.Type type;
+ private Collection<PortNumber> outputPorts;
+
+ public FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
+ this.type = type;
+ this.outputPorts = ImmutableList.copyOf(outputPorts);
+ }
+
+ public NextObjective.Type type() {
+ return type;
+ }
+
+ public Collection<PortNumber> outputPorts() {
+ return outputPorts;
+ }
+
+ @Override
+ public byte[] data() {
+ return KRYO.serialize(this);
+ }
+ }
+}