blob: e2d8048fb1f638b37cf9dc9ec3cca91c780b18f7 [file] [log] [blame]
Yi Tseng0b809722017-11-03 10:23:26 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.pipeliner;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import org.onlab.util.KryoNamespace;
22import org.onosproject.net.DeviceId;
23import org.onosproject.net.PortNumber;
24import org.onosproject.net.behaviour.NextGroup;
25import org.onosproject.net.behaviour.Pipeliner;
26import org.onosproject.net.behaviour.PipelinerContext;
27import org.onosproject.net.driver.AbstractHandlerBehaviour;
28import org.onosproject.net.flow.FlowRule;
29import org.onosproject.net.flow.FlowRuleOperations;
30import org.onosproject.net.flow.FlowRuleOperationsContext;
31import org.onosproject.net.flow.FlowRuleService;
32import org.onosproject.net.flow.instructions.Instruction;
33import org.onosproject.net.flow.instructions.Instructions;
34import org.onosproject.net.flowobjective.FilteringObjective;
35import org.onosproject.net.flowobjective.FlowObjectiveStore;
36import org.onosproject.net.flowobjective.ForwardingObjective;
37import org.onosproject.net.flowobjective.NextObjective;
38import org.onosproject.net.flowobjective.Objective;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.group.GroupDescription;
41import org.onosproject.net.group.GroupEvent;
42import org.onosproject.net.group.GroupListener;
43import org.onosproject.net.group.GroupService;
44import org.onosproject.store.serializers.KryoNamespaces;
45import org.slf4j.Logger;
46
47import java.util.Collection;
48import java.util.List;
49import java.util.concurrent.CompletableFuture;
50import java.util.concurrent.ExecutionException;
51import java.util.concurrent.TimeUnit;
52import java.util.concurrent.TimeoutException;
53import java.util.concurrent.atomic.AtomicInteger;
54import java.util.function.Consumer;
55
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Pipeliner for fabric pipeline.
60 */
61public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
62 private static final Logger log = getLogger(FabricPipeliner.class);
63
64 protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
65 .register(KryoNamespaces.API)
66 .register(FabricNextGroup.class)
67 .build("FabricPipeliner");
68
69 // TODO: make this configurable
70 private static final long DEFAULT_INSTALLATION_TIME_OUT = 10;
71
72 protected DeviceId deviceId;
73 protected FlowRuleService flowRuleService;
74 protected GroupService groupService;
75 protected FlowObjectiveStore flowObjectiveStore;
76 protected FabricFilteringPipeliner pipelinerFilter;
77 protected FabricForwardingPipeliner pipelinerForward;
78 protected FabricNextPipeliner pipelinerNext;
79
80
81 @Override
82 public void init(DeviceId deviceId, PipelinerContext context) {
83 this.deviceId = deviceId;
84 this.flowRuleService = context.directory().get(FlowRuleService.class);
85 this.groupService = context.directory().get(GroupService.class);
86 this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
87 this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
88 this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
89 this.pipelinerNext = new FabricNextPipeliner(deviceId);
90 }
91
92 @Override
93 public void filter(FilteringObjective filterObjective) {
94 PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
95 if (result.error().isPresent()) {
96 fail(filterObjective, result.error().get());
97 return;
98 }
99
100 applyTranslationResult(filterObjective, result, success -> {
101 if (success) {
102 success(filterObjective);
103 } else {
104 fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
105 }
106 });
107 }
108
109 @Override
110 public void forward(ForwardingObjective forwardObjective) {
111 PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
112 if (result.error().isPresent()) {
113 fail(forwardObjective, result.error().get());
114 return;
115 }
116
117 applyTranslationResult(forwardObjective, result, success -> {
118 if (success) {
119 success(forwardObjective);
120 } else {
121 fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
122 }
123 });
124 }
125
126 @Override
127 public void next(NextObjective nextObjective) {
128 PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
129
130 if (result.error().isPresent()) {
131 fail(nextObjective, result.error().get());
132 return;
133 }
134
135 applyTranslationResult(nextObjective, result, success -> {
136 if (!success) {
137 fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
138 return;
139 }
140
141 // Success, put next group to objective store
142 List<PortNumber> portNumbers = Lists.newArrayList();
143 nextObjective.next().forEach(treatment -> {
144 Instructions.OutputInstruction outputInst = treatment.allInstructions()
145 .stream()
146 .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
147 .map(inst -> (Instructions.OutputInstruction) inst)
148 .findFirst()
149 .orElse(null);
150
151 if (outputInst != null) {
152 portNumbers.add(outputInst.port());
153 }
154 });
155 FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
156 portNumbers);
157 flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
158 success(nextObjective);
159 });
160 }
161
162 @Override
163 public List<String> getNextMappings(NextGroup nextGroup) {
164 return null;
165 }
166
167 private void applyTranslationResult(Objective objective,
168 PipelinerTranslationResult result,
169 Consumer<Boolean> callback) {
170 Collection<GroupDescription> groups = result.groups();
171 Collection<FlowRule> flowRules = result.flowRules();
172 CompletableFuture.supplyAsync(() -> installGroups(objective, groups))
173 .thenApplyAsync(groupSuccess -> groupSuccess && installFlows(objective, flowRules))
174 .thenAcceptAsync(callback)
175 .exceptionally((ex) -> {
176 log.warn("Got unexpected exception while applying translation result {}",
177 result);
178 fail(objective, ObjectiveError.UNKNOWN);
179 return null;
180 });
181 }
182
183 private boolean installFlows(Objective objective, Collection<FlowRule> flowRules) {
184 if (flowRules.isEmpty()) {
185 return true;
186 }
187 CompletableFuture<Boolean> flowInstallFuture = new CompletableFuture<>();
188 FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
189 @Override
190 public void onSuccess(FlowRuleOperations ops) {
191 flowInstallFuture.complete(true);
192 }
193
194 @Override
195 public void onError(FlowRuleOperations ops) {
196 log.warn("Failed to install flow rules: {}", flowRules);
197 flowInstallFuture.complete(false);
198 }
199 };
200
201 FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
202 flowRuleService.apply(ops);
203
204 try {
205 return flowInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
206 } catch (InterruptedException | ExecutionException | TimeoutException e) {
207 log.warn("Got exception while installing groups: {}", e);
208 return false;
209 }
210 }
211
212 private boolean installGroups(Objective objective, Collection<GroupDescription> groups) {
213 if (groups.isEmpty()) {
214 return true;
215 }
216 int numGroupsToBeInstalled = groups.size();
217 CompletableFuture<Boolean> groupInstallFuture = new CompletableFuture<>();
218 AtomicInteger numGroupsInstalled = new AtomicInteger(0);
219 GroupListener listener = new GroupListener() {
220 @Override
221 public void event(GroupEvent event) {
222 int currentNumGroupInstalled = numGroupsInstalled.incrementAndGet();
223 if (currentNumGroupInstalled == numGroupsToBeInstalled) {
224 // install completed
225 groupService.removeListener(this);
226 groupInstallFuture.complete(true);
227 }
228 }
229 @Override
230 public boolean isRelevant(GroupEvent event) {
231 return groups.contains(event.subject());
232 }
233 };
234 groupService.addListener(listener);
235
236 switch (objective.op()) {
237 case ADD:
238 groups.forEach(groupService::addGroup);
239 break;
240 case REMOVE:
241 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
242 break;
243 default:
244 log.warn("Unsupported objective operation {}", objective.op());
245 groupService.removeListener(listener);
246 }
247 try {
248 return groupInstallFuture.get(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS);
249 } catch (InterruptedException | ExecutionException | TimeoutException e) {
250 log.warn("Got exception while installing groups: {}", e);
251 return false;
252 }
253 }
254
255 static void fail(Objective objective, ObjectiveError error) {
256 objective.context().ifPresent(ctx -> ctx.onError(objective, error));
257 }
258
259 static void success(Objective objective) {
260 objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
261 }
262
263 static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
264 FlowRuleOperationsContext ctx) {
265 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
266 switch (objective.op()) {
267 case ADD:
268 flowRules.forEach(ops::add);
269 break;
270 case REMOVE:
271 flowRules.forEach(ops::remove);
272 break;
273 default:
274 log.warn("Unsupported op {} for {}", objective);
275 fail(objective, ObjectiveError.BADPARAMS);
276 return null;
277 }
278 return ops.build(ctx);
279 }
280
281 class FabricNextGroup implements NextGroup {
282 private NextObjective.Type type;
283 private Collection<PortNumber> outputPorts;
284
285 public FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
286 this.type = type;
287 this.outputPorts = ImmutableList.copyOf(outputPorts);
288 }
289
290 public NextObjective.Type type() {
291 return type;
292 }
293
294 public Collection<PortNumber> outputPorts() {
295 return outputPorts;
296 }
297
298 @Override
299 public byte[] data() {
300 return KRYO.serialize(this);
301 }
302 }
303}