blob: fbb44ab8c94a31ebeccf41da067d8b0382351405 [file] [log] [blame]
ke hana1cb2512016-09-29 15:15:59 +08001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.driver.pipeline;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.Lists;
23import org.apache.commons.lang3.tuple.ImmutablePair;
24import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.packet.EthType;
27import org.onlab.packet.IPv4;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.driver.AbstractHandlerBehaviour;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleOperations;
43import org.onosproject.net.flow.FlowRuleOperationsContext;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.onosproject.net.flow.criteria.Criteria;
48import org.onosproject.net.flow.criteria.Criterion;
49import org.onosproject.net.flow.criteria.EthTypeCriterion;
50import org.onosproject.net.flow.criteria.IPCriterion;
51import org.onosproject.net.flow.criteria.IPProtocolCriterion;
52import org.onosproject.net.flow.criteria.PortCriterion;
53import org.onosproject.net.flow.criteria.VlanIdCriterion;
54import org.onosproject.net.flow.instructions.Instruction;
55import org.onosproject.net.flow.instructions.Instructions;
56import org.onosproject.net.flow.instructions.L2ModificationInstruction;
57import org.onosproject.net.flowobjective.FilteringObjective;
58import org.onosproject.net.flowobjective.FlowObjectiveStore;
59import org.onosproject.net.flowobjective.ForwardingObjective;
60import org.onosproject.net.flowobjective.NextObjective;
61import org.onosproject.net.flowobjective.Objective;
62import org.onosproject.net.flowobjective.ObjectiveError;
63import org.onosproject.net.group.DefaultGroupBucket;
64import org.onosproject.net.group.DefaultGroupDescription;
65import org.onosproject.net.group.DefaultGroupKey;
66import org.onosproject.net.group.Group;
67import org.onosproject.net.group.GroupBucket;
68import org.onosproject.net.group.GroupBuckets;
69import org.onosproject.net.group.GroupDescription;
70import org.onosproject.net.group.GroupEvent;
71import org.onosproject.net.group.GroupKey;
72import org.onosproject.net.group.GroupListener;
73import org.onosproject.net.group.GroupService;
74import org.onosproject.store.serializers.KryoNamespaces;
75import org.onosproject.store.service.StorageService;
76import org.slf4j.Logger;
77
78import java.util.Collection;
79import java.util.Collections;
80import java.util.List;
81import java.util.Optional;
82import java.util.concurrent.TimeUnit;
83import java.util.stream.Collectors;
84import java.util.Iterator;
85
86
87import static org.slf4j.LoggerFactory.getLogger;
88
89/**
90 * Pipeliner for OLT device.
91 */
92
93public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
94
95 private static final Integer QQ_TABLE = 1;
96 private static final short MCAST_VLAN = 4000;
97 private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
98 private final Logger log = getLogger(getClass());
99
100 private ServiceDirectory serviceDirectory;
101 private FlowRuleService flowRuleService;
102 private GroupService groupService;
103 private CoreService coreService;
104 private StorageService storageService;
105
106 private DeviceId deviceId;
107 private ApplicationId appId;
108
109
110 protected FlowObjectiveStore flowObjectiveStore;
111
112 private Cache<GroupKey, NextObjective> pendingGroups;
113
114 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
115 .register(KryoNamespaces.API)
116 .register(GroupKey.class)
117 .register(DefaultGroupKey.class)
118 .register(OLTPipelineGroup.class)
119 .build("OltPipeline");
120 @Override
121 public void init(DeviceId deviceId, PipelinerContext context) {
122 log.debug("Initiate OLT pipeline");
123 this.serviceDirectory = context.directory();
124 this.deviceId = deviceId;
125
126 flowRuleService = serviceDirectory.get(FlowRuleService.class);
127 coreService = serviceDirectory.get(CoreService.class);
128 groupService = serviceDirectory.get(GroupService.class);
129 flowObjectiveStore = context.store();
130 storageService = serviceDirectory.get(StorageService.class);
131
132 appId = coreService.registerApplication(
133 "org.onosproject.driver.OLTPipeline");
134
135
136 pendingGroups = CacheBuilder.newBuilder()
137 .expireAfterWrite(20, TimeUnit.SECONDS)
138 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
139 if (notification.getCause() == RemovalCause.EXPIRED) {
140 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
141 }
142 }).build();
143
144 groupService.addListener(new InnerGroupListener());
145
146 }
147
148 @Override
149 public void filter(FilteringObjective filter) {
150 Instructions.OutputInstruction output;
151
152 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
153 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
154 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
155 .limit(1)
156 .findFirst().get();
157
158 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
159 log.error("OLT can only filter packet to controller");
160 fail(filter, ObjectiveError.UNSUPPORTED);
161 return;
162 }
163 } else {
164 fail(filter, ObjectiveError.BADPARAMS);
165 return;
166 }
167
168 if (filter.key().type() != Criterion.Type.IN_PORT) {
169 fail(filter, ObjectiveError.BADPARAMS);
170 return;
171 }
172
173 EthTypeCriterion ethType = (EthTypeCriterion)
174 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
175
176 if (ethType == null) {
177 fail(filter, ObjectiveError.BADPARAMS);
178 return;
179 }
180
181 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
182 provisionEapol(filter, ethType, output);
183 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
184 IPProtocolCriterion ipProto = (IPProtocolCriterion)
185 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
186 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
187 provisionIgmp(filter, ethType, ipProto, output);
188 } else {
189 log.error("OLT can only filter igmp");
190 fail(filter, ObjectiveError.UNSUPPORTED);
191 }
192 } else {
193 log.error("OLT can only filter eapol and igmp");
194 fail(filter, ObjectiveError.UNSUPPORTED);
195 }
196
197 }
198
199 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
200 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
201 switch (objective.op()) {
202
203 case ADD:
204 flowBuilder.add(ruleBuilder.build());
205 break;
206 case REMOVE:
207 flowBuilder.remove(ruleBuilder.build());
208 break;
209 default:
210 log.warn("Unknown operation {}", objective.op());
211 }
212
213 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
214 @Override
215 public void onSuccess(FlowRuleOperations ops) {
216 objective.context().ifPresent(context -> context.onSuccess(objective));
217 }
218
219 @Override
220 public void onError(FlowRuleOperations ops) {
221 objective.context()
222 .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
223 }
224 }));
225 }
226
227 @Override
228 public void forward(ForwardingObjective fwd) {
229
230 if (checkForMulticast(fwd)) {
231 processMulticastRule(fwd);
232 return;
233 }
234
235 TrafficTreatment treatment = fwd.treatment();
236
237 List<Instruction> instructions = treatment.allInstructions();
238
239 Optional<Instruction> vlanIntruction = instructions.stream()
240 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
241 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
242 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
243 ((L2ModificationInstruction) i).subtype() ==
244 L2ModificationInstruction.L2SubType.VLAN_POP)
245 .findAny();
246
247 if (vlanIntruction.isPresent()) {
248 L2ModificationInstruction vlanIns =
249 (L2ModificationInstruction) vlanIntruction.get();
250
251 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
252 installUpstreamRules(fwd);
253 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
254 installDownstreamRules(fwd);
255 } else {
256 log.error("Unknown OLT operation: {}", fwd);
257 fail(fwd, ObjectiveError.UNSUPPORTED);
258 return;
259 }
260
261 pass(fwd);
262 } else {
263 TrafficSelector selector = fwd.selector();
264
265 if (fwd.treatment() != null) {
266 // Deal with SPECIFIC and VERSATILE in the same manner.
267 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
268 .forDevice(deviceId)
269 .withSelector(selector)
270 .fromApp(fwd.appId())
271 .withPriority(fwd.priority())
272 .withTreatment(fwd.treatment());
273
274 if (fwd.permanent()) {
275 ruleBuilder.makePermanent();
276 } else {
277 ruleBuilder.makeTemporary(fwd.timeout());
278 }
279 installObjective(ruleBuilder, fwd);
280
281 } else {
282 log.error("No treatment error: {}", fwd);
283 fail(fwd, ObjectiveError.UNSUPPORTED);
284 }
285 }
286
287 }
288
289
290 @Override
291 public void next(NextObjective nextObjective) {
292 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
293 log.error("OLT only supports broadcast groups.");
294 fail(nextObjective, ObjectiveError.BADPARAMS);
295 }
296
297 if (nextObjective.next().size() != 1) {
298 log.error("OLT only supports singleton broadcast groups.");
299 fail(nextObjective, ObjectiveError.BADPARAMS);
300 }
301
302 TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
303
304
305 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
306 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
307
308
309 pendingGroups.put(key, nextObjective);
310
311 switch (nextObjective.op()) {
312 case ADD:
313 GroupDescription groupDesc =
314 new DefaultGroupDescription(deviceId,
315 GroupDescription.Type.ALL,
316 new GroupBuckets(Collections.singletonList(bucket)),
317 key,
318 null,
319 nextObjective.appId());
320 groupService.addGroup(groupDesc);
321 break;
322 case REMOVE:
323 groupService.removeGroup(deviceId, key, nextObjective.appId());
324 break;
325 case ADD_TO_EXISTING:
326 groupService.addBucketsToGroup(deviceId, key,
327 new GroupBuckets(Collections.singletonList(bucket)),
328 key, nextObjective.appId());
329 break;
330 case REMOVE_FROM_EXISTING:
331 groupService.removeBucketsFromGroup(deviceId, key,
332 new GroupBuckets(Collections.singletonList(bucket)),
333 key, nextObjective.appId());
334 break;
335 default:
336 log.warn("Unknown next objective operation: {}", nextObjective.op());
337 }
338
339
340 }
341
342 private void processMulticastRule(ForwardingObjective fwd) {
343 if (fwd.nextId() == null) {
344 log.error("Multicast objective does not have a next id");
345 fail(fwd, ObjectiveError.BADPARAMS);
346 }
347
348 GroupKey key = getGroupForNextObjective(fwd.nextId());
349
350 if (key == null) {
351 log.error("Group for forwarding objective missing: {}", fwd);
352 fail(fwd, ObjectiveError.GROUPMISSING);
353 }
354
355 Group group = groupService.getGroup(deviceId, key);
356 TrafficTreatment treatment =
357 buildTreatment(Instructions.createGroup(group.id()));
358
359 FlowRule rule = DefaultFlowRule.builder()
360 .fromApp(fwd.appId())
361 .forDevice(deviceId)
362 .forTable(0)
363 .makePermanent()
364 .withPriority(fwd.priority())
365 .withSelector(fwd.selector())
366 .withTreatment(treatment)
367 .build();
368
369 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
370 switch (fwd.op()) {
371
372 case ADD:
373 builder.add(rule);
374 break;
375 case REMOVE:
376 builder.remove(rule);
377 break;
378 case ADD_TO_EXISTING:
379 case REMOVE_FROM_EXISTING:
380 break;
381 default:
382 log.warn("Unknown forwarding operation: {}", fwd.op());
383 }
384
385 applyFlowRules(builder, fwd);
386
387 }
388
389 private boolean checkForMulticast(ForwardingObjective fwd) {
390
391 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
392 Criterion.Type.IPV4_DST);
393
394 if (ip == null) {
395 return false;
396 }
397
398 return ip.ip().isMulticast();
399
400 }
401
402 private GroupKey getGroupForNextObjective(Integer nextId) {
403 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
404 return appKryo.deserialize(next.data());
405
406 }
407
408 private void installDownstreamRules(ForwardingObjective fwd) {
409 List<Pair<Instruction, Instruction>> vlanOps =
410 vlanOps(fwd,
411 L2ModificationInstruction.L2SubType.VLAN_POP);
412
413 if (vlanOps == null) {
414 return;
415 }
416
417 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
418
419 if (output == null) {
420 return;
421 }
422
423 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
424
425 TrafficSelector selector = fwd.selector();
426
427 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
428 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
429 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
430 Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
431
432 if (outerVlan == null || innerVlan == null || inport == null) {
433 log.error("Forwarding objective is underspecified: {}", fwd);
434 fail(fwd, ObjectiveError.BADPARAMS);
435 return;
436 }
437
438 Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
439
440 FlowRule.Builder outer = DefaultFlowRule.builder()
441 .fromApp(fwd.appId())
442 .forDevice(deviceId)
443 .makePermanent()
444 .withPriority(fwd.priority())
445 .withSelector(buildSelector(inport, outerVlan, bullshit))
446 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
447 Instructions.transition(QQ_TABLE)));
448
449 FlowRule.Builder inner = DefaultFlowRule.builder()
450 .fromApp(fwd.appId())
451 .forDevice(deviceId)
452 .forTable(QQ_TABLE)
453 .makePermanent()
454 .withPriority(fwd.priority())
455 .withSelector(buildSelector(inport, innerVid))
456 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
457 output));
458
459 applyRules(fwd, inner, outer);
460
461 }
462
463 private boolean hasUntaggedVlanTag(TrafficSelector selector) {
464 Iterator<Criterion> iter = selector.criteria().iterator();
465
466 while (iter.hasNext()) {
467 Criterion criterion = iter.next();
468 if (criterion.type() == Criterion.Type.VLAN_VID &&
469 ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
470 return true;
471 }
472 }
473
474 return false;
475 }
476
477 private void installUpstreamRules(ForwardingObjective fwd) {
478 List<Pair<Instruction, Instruction>> vlanOps =
479 vlanOps(fwd,
480 L2ModificationInstruction.L2SubType.VLAN_PUSH);
481 FlowRule.Builder inner;
482
483 if (vlanOps == null) {
484 return;
485 }
486
487 Instruction output = fetchOutput(fwd, "upstream");
488
489 if (output == null) {
490 return;
491 }
492
493 Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
494
495 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
496
497
498 if (hasUntaggedVlanTag(fwd.selector())) {
499 inner = DefaultFlowRule.builder()
500 .fromApp(fwd.appId())
501 .forDevice(deviceId)
502 .makePermanent()
503 .withPriority(fwd.priority())
504 .withSelector(fwd.selector())
505 .withTreatment(buildTreatment(innerPair.getLeft(),
506 innerPair.getRight(),
507 Instructions.transition(QQ_TABLE)));
508 } else {
509 inner = DefaultFlowRule.builder()
510 .fromApp(fwd.appId())
511 .forDevice(deviceId)
512 .makePermanent()
513 .withPriority(fwd.priority())
514 .withSelector(fwd.selector())
515 .withTreatment(buildTreatment(
516 innerPair.getRight(),
517 Instructions.transition(QQ_TABLE)));
518 }
519
520
521 PortCriterion inPort = (PortCriterion)
522 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
523
524 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
525 innerPair.getRight()).vlanId();
526
527 FlowRule.Builder outer = DefaultFlowRule.builder()
528 .fromApp(fwd.appId())
529 .forDevice(deviceId)
530 .forTable(QQ_TABLE)
531 .makePermanent()
532 .withPriority(fwd.priority())
533 .withSelector(buildSelector(inPort,
534 Criteria.matchVlanId(cVlanId)))
535 .withTreatment(buildTreatment(outerPair.getLeft(),
536 outerPair.getRight(),
537 output));
538
539 applyRules(fwd, inner, outer);
540
541 }
542
543 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
544 Instruction output = fwd.treatment().allInstructions().stream()
545 .filter(i -> i.type() == Instruction.Type.OUTPUT)
546 .findFirst().orElse(null);
547
548 if (output == null) {
549 log.error("OLT {} rule has no output", direction);
550 fail(fwd, ObjectiveError.BADPARAMS);
551 return null;
552 }
553 return output;
554 }
555
556 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
557 L2ModificationInstruction.L2SubType type) {
558
559 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
560 fwd.treatment().allInstructions(), type);
561
562 if (vlanOps == null) {
563 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
564 ? "downstream" : "upstream";
565 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
566 fail(fwd, ObjectiveError.BADPARAMS);
567 return null;
568 }
569 return vlanOps;
570 }
571
572
573 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
574 L2ModificationInstruction.L2SubType type) {
575
576 List<Instruction> vlanPushs = findL2Instructions(
577 type,
578 instructions);
579 List<Instruction> vlanSets = findL2Instructions(
580 L2ModificationInstruction.L2SubType.VLAN_ID,
581 instructions);
582
583 if (vlanPushs.size() != vlanSets.size()) {
584 return null;
585 }
586
587 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
588
589 for (int i = 0; i < vlanPushs.size(); i++) {
590 pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
591 }
592 return pairs;
593 }
594
595 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
596 List<Instruction> actions) {
597 return actions.stream()
598 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
599 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
600 .collect(Collectors.toList());
601 }
602
603 private void provisionEapol(FilteringObjective filter,
604 EthTypeCriterion ethType,
605 Instructions.OutputInstruction output) {
606
607 TrafficSelector selector = buildSelector(filter.key(), ethType);
608 TrafficTreatment treatment = buildTreatment(output);
609 buildAndApplyRule(filter, selector, treatment);
610
611 }
612
613 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
614 IPProtocolCriterion ipProto,
615 Instructions.OutputInstruction output) {
616 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
617 TrafficTreatment treatment = buildTreatment(output);
618 buildAndApplyRule(filter, selector, treatment);
619 }
620
621 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
622 TrafficTreatment treatment) {
623 FlowRule rule = DefaultFlowRule.builder()
624 .fromApp(filter.appId())
625 .forDevice(deviceId)
626 .forTable(0)
627 .makePermanent()
628 .withSelector(selector)
629 .withTreatment(treatment)
630 .withPriority(filter.priority())
631 .build();
632
633 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
634
635 switch (filter.type()) {
636 case PERMIT:
637 opsBuilder.add(rule);
638 break;
639 case DENY:
640 opsBuilder.remove(rule);
641 break;
642 default:
643 log.warn("Unknown filter type : {}", filter.type());
644 fail(filter, ObjectiveError.UNSUPPORTED);
645 }
646
647 applyFlowRules(opsBuilder, filter);
648 }
649
650 private void applyRules(ForwardingObjective fwd,
651 FlowRule.Builder inner, FlowRule.Builder outer) {
652 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
653 switch (fwd.op()) {
654 case ADD:
655 builder.add(inner.build()).add(outer.build());
656 break;
657 case REMOVE:
658 builder.remove(inner.build()).remove(outer.build());
659 break;
660 case ADD_TO_EXISTING:
661 break;
662 case REMOVE_FROM_EXISTING:
663 break;
664 default:
665 log.warn("Unknown forwarding operation: {}", fwd.op());
666 }
667
668 applyFlowRules(builder, fwd);
669 }
670
671 private void applyFlowRules(FlowRuleOperations.Builder builder,
672 Objective objective) {
673 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
674 @Override
675 public void onSuccess(FlowRuleOperations ops) {
676 pass(objective);
677 }
678
679 @Override
680 public void onError(FlowRuleOperations ops) {
681 fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
682 }
683 }));
684 }
685
686 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
687 return criteria.stream()
688 .filter(c -> c.type().equals(type))
689 .limit(1)
690 .findFirst().orElse(null);
691 }
692
693 private TrafficSelector buildSelector(Criterion... criteria) {
694
695
696 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
697
698 for (Criterion c : criteria) {
699 sBuilder.add(c);
700 }
701
702 return sBuilder.build();
703 }
704
705 private TrafficTreatment buildTreatment(Instruction... instructions) {
706
707
708 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
709
710 for (Instruction i : instructions) {
711 tBuilder.add(i);
712 }
713
714 return tBuilder.build();
715 }
716
717
718 private void fail(Objective obj, ObjectiveError error) {
719 obj.context().ifPresent(context -> context.onError(obj, error));
720 }
721
722 private void pass(Objective obj) {
723 obj.context().ifPresent(context -> context.onSuccess(obj));
724 }
725
726
727 private class InnerGroupListener implements GroupListener {
728 @Override
729 public void event(GroupEvent event) {
730 if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
731 GroupKey key = event.subject().appCookie();
732
733 NextObjective obj = pendingGroups.getIfPresent(key);
734 if (obj != null) {
735 flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
736 pass(obj);
737 pendingGroups.invalidate(key);
738 }
739 }
740 }
741 }
742
743 private static class OLTPipelineGroup implements NextGroup {
744
745 private final GroupKey key;
746
747 public OLTPipelineGroup(GroupKey key) {
748 this.key = key;
749 }
750
751 public GroupKey key() {
752 return key;
753 }
754
755 @Override
756 public byte[] data() {
757 return appKryo.serialize(key);
758 }
759
760 }
761
762 @Override
763 public List<String> getNextMappings(NextGroup nextGroup) {
764 // TODO Implementation deferred to vendor
765 return null;
766 }
767}