blob: 5c66474964f774c77d89ef261979c9c454e5dc23 [file] [log] [blame]
Yi Tseng0b809722017-11-03 10:23:26 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.pipeliner;
18
Yi Tsengf78e1742018-04-08 19:57:17 +080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalListener;
Yi Tseng0b809722017-11-03 10:23:26 -070023import com.google.common.collect.ImmutableList;
24import com.google.common.collect.Lists;
Yi Tsengf78e1742018-04-08 19:57:17 +080025import com.google.common.collect.Sets;
Yi Tseng0b809722017-11-03 10:23:26 -070026import org.onlab.util.KryoNamespace;
Yi Tsengf78e1742018-04-08 19:57:17 +080027import org.onosproject.core.GroupId;
Yi Tseng0b809722017-11-03 10:23:26 -070028import org.onosproject.net.DeviceId;
29import org.onosproject.net.PortNumber;
30import org.onosproject.net.behaviour.NextGroup;
31import org.onosproject.net.behaviour.Pipeliner;
32import org.onosproject.net.behaviour.PipelinerContext;
33import org.onosproject.net.driver.AbstractHandlerBehaviour;
Yi Tseng4fd28432018-02-01 14:48:03 -080034import org.onosproject.net.driver.Driver;
Yi Tsengf78e1742018-04-08 19:57:17 +080035import org.onosproject.net.flow.FlowId;
Yi Tseng0b809722017-11-03 10:23:26 -070036import org.onosproject.net.flow.FlowRule;
37import org.onosproject.net.flow.FlowRuleOperations;
38import org.onosproject.net.flow.FlowRuleOperationsContext;
39import org.onosproject.net.flow.FlowRuleService;
40import org.onosproject.net.flow.instructions.Instruction;
41import org.onosproject.net.flow.instructions.Instructions;
42import org.onosproject.net.flowobjective.FilteringObjective;
43import org.onosproject.net.flowobjective.FlowObjectiveStore;
44import org.onosproject.net.flowobjective.ForwardingObjective;
45import org.onosproject.net.flowobjective.NextObjective;
46import org.onosproject.net.flowobjective.Objective;
47import org.onosproject.net.flowobjective.ObjectiveError;
48import org.onosproject.net.group.GroupDescription;
49import org.onosproject.net.group.GroupEvent;
50import org.onosproject.net.group.GroupListener;
51import org.onosproject.net.group.GroupService;
52import org.onosproject.store.serializers.KryoNamespaces;
53import org.slf4j.Logger;
54
55import java.util.Collection;
56import java.util.List;
Yi Tseng1b154bd2017-11-20 17:48:19 -080057import java.util.Map;
Yi Tsengf78e1742018-04-08 19:57:17 +080058import java.util.Set;
59import java.util.concurrent.ConcurrentHashMap;
Yi Tseng0b809722017-11-03 10:23:26 -070060import java.util.concurrent.TimeUnit;
Yi Tseng0b809722017-11-03 10:23:26 -070061import java.util.function.Consumer;
Yi Tseng1b154bd2017-11-20 17:48:19 -080062import java.util.stream.Collectors;
Yi Tseng0b809722017-11-03 10:23:26 -070063
64import static org.slf4j.LoggerFactory.getLogger;
65
66/**
67 * Pipeliner for fabric pipeline.
68 */
69public class FabricPipeliner extends AbstractHandlerBehaviour implements Pipeliner {
70 private static final Logger log = getLogger(FabricPipeliner.class);
71
72 protected static final KryoNamespace KRYO = new KryoNamespace.Builder()
73 .register(KryoNamespaces.API)
74 .register(FabricNextGroup.class)
75 .build("FabricPipeliner");
76
Yi Tsengf78e1742018-04-08 19:57:17 +080077 private static final Set<GroupEvent.Type> GROUP_FAILED_TYPES =
78 Sets.newHashSet(GroupEvent.Type.GROUP_ADD_FAILED,
79 GroupEvent.Type.GROUP_REMOVE_FAILED,
80 GroupEvent.Type.GROUP_UPDATE_FAILED);
81
Yi Tseng0b809722017-11-03 10:23:26 -070082 // TODO: make this configurable
Yi Tseng1b154bd2017-11-20 17:48:19 -080083 private static final long DEFAULT_INSTALLATION_TIME_OUT = 40;
Yi Tseng0b809722017-11-03 10:23:26 -070084
85 protected DeviceId deviceId;
86 protected FlowRuleService flowRuleService;
87 protected GroupService groupService;
Yi Tsengf78e1742018-04-08 19:57:17 +080088 protected GroupListener groupListener = new InternalGroupListener();
Yi Tseng0b809722017-11-03 10:23:26 -070089 protected FlowObjectiveStore flowObjectiveStore;
90 protected FabricFilteringPipeliner pipelinerFilter;
91 protected FabricForwardingPipeliner pipelinerForward;
92 protected FabricNextPipeliner pipelinerNext;
93
Yi Tsengf78e1742018-04-08 19:57:17 +080094 private Map<FlowId, PendingInstallObjective> pendingInstallObjectiveFlows = new ConcurrentHashMap<>();
95 private Map<GroupId, PendingInstallObjective> pendingInstallObjectiveGroups = new ConcurrentHashMap<>();
96 private Cache<Objective, PendingInstallObjective> pendingInstallObjectives = CacheBuilder.newBuilder()
97 .expireAfterWrite(DEFAULT_INSTALLATION_TIME_OUT, TimeUnit.SECONDS)
98 .removalListener((RemovalListener<Objective, PendingInstallObjective>) removalNotification -> {
99 RemovalCause cause = removalNotification.getCause();
100 PendingInstallObjective pio = removalNotification.getValue();
101 if (cause == RemovalCause.EXPIRED && pio != null) {
102 pio.failed(ObjectiveError.INSTALLATIONTIMEOUT);
103 }
104 })
105 .build();
106
Yi Tseng0b809722017-11-03 10:23:26 -0700107
108 @Override
109 public void init(DeviceId deviceId, PipelinerContext context) {
Yi Tseng4fd28432018-02-01 14:48:03 -0800110 Driver driver = handler().driver();
Yi Tseng0b809722017-11-03 10:23:26 -0700111 this.deviceId = deviceId;
112 this.flowRuleService = context.directory().get(FlowRuleService.class);
113 this.groupService = context.directory().get(GroupService.class);
Yi Tsengf78e1742018-04-08 19:57:17 +0800114 this.groupService.addListener(groupListener);
Yi Tseng0b809722017-11-03 10:23:26 -0700115 this.flowObjectiveStore = context.directory().get(FlowObjectiveStore.class);
116 this.pipelinerFilter = new FabricFilteringPipeliner(deviceId);
117 this.pipelinerForward = new FabricForwardingPipeliner(deviceId);
Yi Tseng4fd28432018-02-01 14:48:03 -0800118 this.pipelinerNext = new FabricNextPipeliner(deviceId, driver);
Yi Tseng0b809722017-11-03 10:23:26 -0700119 }
120
121 @Override
122 public void filter(FilteringObjective filterObjective) {
123 PipelinerTranslationResult result = pipelinerFilter.filter(filterObjective);
124 if (result.error().isPresent()) {
125 fail(filterObjective, result.error().get());
126 return;
127 }
128
129 applyTranslationResult(filterObjective, result, success -> {
130 if (success) {
131 success(filterObjective);
132 } else {
133 fail(filterObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
134 }
135 });
136 }
137
138 @Override
139 public void forward(ForwardingObjective forwardObjective) {
140 PipelinerTranslationResult result = pipelinerForward.forward(forwardObjective);
141 if (result.error().isPresent()) {
142 fail(forwardObjective, result.error().get());
143 return;
144 }
145
146 applyTranslationResult(forwardObjective, result, success -> {
147 if (success) {
148 success(forwardObjective);
149 } else {
150 fail(forwardObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
151 }
152 });
153 }
154
155 @Override
156 public void next(NextObjective nextObjective) {
157 PipelinerTranslationResult result = pipelinerNext.next(nextObjective);
158
159 if (result.error().isPresent()) {
160 fail(nextObjective, result.error().get());
161 return;
162 }
163
Yi Tseng1b154bd2017-11-20 17:48:19 -0800164 if (nextObjective.op() == Objective.Operation.VERIFY) {
165 // TODO: support VERIFY operation
166 log.debug("Currently we don't support VERIFY operation, return success directly to the context");
167 success(nextObjective);
168 return;
169 }
170
Yi Tseng0b809722017-11-03 10:23:26 -0700171 applyTranslationResult(nextObjective, result, success -> {
172 if (!success) {
173 fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
174 return;
175 }
176
177 // Success, put next group to objective store
178 List<PortNumber> portNumbers = Lists.newArrayList();
179 nextObjective.next().forEach(treatment -> {
180 Instructions.OutputInstruction outputInst = treatment.allInstructions()
181 .stream()
182 .filter(inst -> inst.type() == Instruction.Type.OUTPUT)
183 .map(inst -> (Instructions.OutputInstruction) inst)
184 .findFirst()
185 .orElse(null);
186
187 if (outputInst != null) {
188 portNumbers.add(outputInst.port());
189 }
190 });
191 FabricNextGroup nextGroup = new FabricNextGroup(nextObjective.type(),
192 portNumbers);
193 flowObjectiveStore.putNextGroup(nextObjective.id(), nextGroup);
194 success(nextObjective);
195 });
196 }
197
198 @Override
199 public List<String> getNextMappings(NextGroup nextGroup) {
Yi Tseng1b154bd2017-11-20 17:48:19 -0800200 FabricNextGroup fabricNextGroup = KRYO.deserialize(nextGroup.data());
201 NextObjective.Type type = fabricNextGroup.type();
202 Collection<PortNumber> outputPorts = fabricNextGroup.outputPorts();
203
204 return outputPorts.stream()
205 .map(port -> String.format("%s -> %s", type, port))
206 .collect(Collectors.toList());
Yi Tseng0b809722017-11-03 10:23:26 -0700207 }
208
209 private void applyTranslationResult(Objective objective,
210 PipelinerTranslationResult result,
211 Consumer<Boolean> callback) {
212 Collection<GroupDescription> groups = result.groups();
213 Collection<FlowRule> flowRules = result.flowRules();
Yi Tsengf78e1742018-04-08 19:57:17 +0800214
215 Set<FlowId> flowIds = flowRules.stream().map(FlowRule::id).collect(Collectors.toSet());
216 Set<GroupId> groupIds = groups.stream().map(GroupDescription::givenGroupId)
217 .map(GroupId::new).collect(Collectors.toSet());
218
219 PendingInstallObjective pio =
220 new PendingInstallObjective(objective, flowIds, groupIds, callback);
221
222 flowIds.forEach(flowId -> {
223 pendingInstallObjectiveFlows.put(flowId, pio);
224 });
225
226 groupIds.forEach(groupId -> {
227 pendingInstallObjectiveGroups.put(groupId, pio);
228 });
229
230 pendingInstallObjectives.put(objective, pio);
231 installGroups(objective, groups);
232 installFlows(objective, flowRules);
Yi Tseng0b809722017-11-03 10:23:26 -0700233 }
234
Yi Tsengf78e1742018-04-08 19:57:17 +0800235 private void installFlows(Objective objective, Collection<FlowRule> flowRules) {
Yi Tseng0b809722017-11-03 10:23:26 -0700236 if (flowRules.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800237 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700238 }
Yi Tsengf78e1742018-04-08 19:57:17 +0800239
Yi Tseng0b809722017-11-03 10:23:26 -0700240 FlowRuleOperationsContext ctx = new FlowRuleOperationsContext() {
241 @Override
242 public void onSuccess(FlowRuleOperations ops) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800243 ops.stages().forEach(stage -> {
244 stage.forEach(op -> {
245 FlowId flowId = op.rule().id();
246 PendingInstallObjective pio = pendingInstallObjectiveFlows.remove(flowId);
247
248 if (pio != null) {
249 pio.flowInstalled(flowId);
250 }
251 });
252 });
Yi Tseng0b809722017-11-03 10:23:26 -0700253 }
254
255 @Override
256 public void onError(FlowRuleOperations ops) {
257 log.warn("Failed to install flow rules: {}", flowRules);
Yi Tsengf78e1742018-04-08 19:57:17 +0800258 PendingInstallObjective pio = pendingInstallObjectives.getIfPresent(objective);
259 if (pio != null) {
260 pio.failed(ObjectiveError.FLOWINSTALLATIONFAILED);
261 }
Yi Tseng0b809722017-11-03 10:23:26 -0700262 }
263 };
264
265 FlowRuleOperations ops = buildFlowRuleOps(objective, flowRules, ctx);
266 flowRuleService.apply(ops);
Yi Tseng0b809722017-11-03 10:23:26 -0700267 }
268
Yi Tsengf78e1742018-04-08 19:57:17 +0800269 private void installGroups(Objective objective, Collection<GroupDescription> groups) {
Yi Tseng0b809722017-11-03 10:23:26 -0700270 if (groups.isEmpty()) {
Yi Tsengf78e1742018-04-08 19:57:17 +0800271 return;
Yi Tseng0b809722017-11-03 10:23:26 -0700272 }
Yi Tseng0b809722017-11-03 10:23:26 -0700273
274 switch (objective.op()) {
275 case ADD:
276 groups.forEach(groupService::addGroup);
277 break;
278 case REMOVE:
279 groups.forEach(group -> groupService.removeGroup(deviceId, group.appCookie(), objective.appId()));
280 break;
Yi Tseng1b154bd2017-11-20 17:48:19 -0800281 case ADD_TO_EXISTING:
282 groups.forEach(group -> {
283 groupService.addBucketsToGroup(deviceId, group.appCookie(),
284 group.buckets(),
285 group.appCookie(),
286 group.appId());
287 });
288 break;
289 case REMOVE_FROM_EXISTING:
290 groups.forEach(group -> {
291 groupService.removeBucketsFromGroup(deviceId, group.appCookie(),
292 group.buckets(),
293 group.appCookie(),
294 group.appId());
295 });
296 break;
Yi Tseng0b809722017-11-03 10:23:26 -0700297 default:
298 log.warn("Unsupported objective operation {}", objective.op());
Yi Tseng0b809722017-11-03 10:23:26 -0700299 }
300 }
301
302 static void fail(Objective objective, ObjectiveError error) {
303 objective.context().ifPresent(ctx -> ctx.onError(objective, error));
304 }
305
306 static void success(Objective objective) {
307 objective.context().ifPresent(ctx -> ctx.onSuccess(objective));
308 }
309
310 static FlowRuleOperations buildFlowRuleOps(Objective objective, Collection<FlowRule> flowRules,
311 FlowRuleOperationsContext ctx) {
312 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
313 switch (objective.op()) {
314 case ADD:
315 flowRules.forEach(ops::add);
316 break;
317 case REMOVE:
318 flowRules.forEach(ops::remove);
319 break;
320 default:
321 log.warn("Unsupported op {} for {}", objective);
322 fail(objective, ObjectiveError.BADPARAMS);
323 return null;
324 }
325 return ops.build(ctx);
326 }
327
328 class FabricNextGroup implements NextGroup {
329 private NextObjective.Type type;
330 private Collection<PortNumber> outputPorts;
331
332 public FabricNextGroup(NextObjective.Type type, Collection<PortNumber> outputPorts) {
333 this.type = type;
334 this.outputPorts = ImmutableList.copyOf(outputPorts);
335 }
336
337 public NextObjective.Type type() {
338 return type;
339 }
340
341 public Collection<PortNumber> outputPorts() {
342 return outputPorts;
343 }
344
345 @Override
346 public byte[] data() {
347 return KRYO.serialize(this);
348 }
349 }
Yi Tseng1b154bd2017-11-20 17:48:19 -0800350
Yi Tsengf78e1742018-04-08 19:57:17 +0800351 class InternalGroupListener implements GroupListener {
352 @Override
353 public void event(GroupEvent event) {
354 GroupId groupId = event.subject().id();
355 PendingInstallObjective pio = pendingInstallObjectiveGroups.remove(groupId);
356 if (pio == null) {
357 return;
358 }
359 if (GROUP_FAILED_TYPES.contains(event.type())) {
360 pio.failed(ObjectiveError.GROUPINSTALLATIONFAILED);
361 }
362 pio.groupInstalled(groupId);
363 }
364
365 @Override
366 public boolean isRelevant(GroupEvent event) {
367 return pendingInstallObjectiveGroups.containsKey(event.subject().id());
368 }
369 }
370
371 class PendingInstallObjective {
372 Objective objective;
373 Collection<FlowId> flowIds;
374 Collection<GroupId> groupIds;
375 Consumer<Boolean> callback;
376
377 public PendingInstallObjective(Objective objective, Collection<FlowId> flowIds,
378 Collection<GroupId> groupIds, Consumer<Boolean> callback) {
379 this.objective = objective;
380 this.flowIds = flowIds;
381 this.groupIds = groupIds;
382 this.callback = callback;
383 }
384
385 void flowInstalled(FlowId flowId) {
386 flowIds.remove(flowId);
387 checkIfFinished();
388 }
389
390 void groupInstalled(GroupId groupId) {
391 groupIds.remove(groupId);
392 checkIfFinished();
393 }
394
395 private void checkIfFinished() {
396 if (flowIds.isEmpty() && groupIds.isEmpty()) {
397 pendingInstallObjectives.invalidate(objective);
398 callback.accept(true);
399 }
400 }
401
402 void failed(ObjectiveError error) {
403 flowIds.forEach(pendingInstallObjectiveFlows::remove);
404 groupIds.forEach(pendingInstallObjectiveGroups::remove);
405 pendingInstallObjectives.invalidate(objective);
406 fail(objective, error);
407 }
408 }
Yi Tseng0b809722017-11-03 10:23:26 -0700409}