blob: 640321f5cff95e42d7ea7e4f1a562189e22c158f [file] [log] [blame]
ke hana1cb2512016-09-29 15:15:59 +08001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.driver.pipeline;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.Lists;
23import org.apache.commons.lang3.tuple.ImmutablePair;
24import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.packet.EthType;
27import org.onlab.packet.IPv4;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.driver.AbstractHandlerBehaviour;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleOperations;
43import org.onosproject.net.flow.FlowRuleOperationsContext;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.onosproject.net.flow.criteria.Criteria;
48import org.onosproject.net.flow.criteria.Criterion;
49import org.onosproject.net.flow.criteria.EthTypeCriterion;
50import org.onosproject.net.flow.criteria.IPCriterion;
51import org.onosproject.net.flow.criteria.IPProtocolCriterion;
52import org.onosproject.net.flow.criteria.PortCriterion;
53import org.onosproject.net.flow.criteria.VlanIdCriterion;
54import org.onosproject.net.flow.instructions.Instruction;
55import org.onosproject.net.flow.instructions.Instructions;
56import org.onosproject.net.flow.instructions.L2ModificationInstruction;
57import org.onosproject.net.flowobjective.FilteringObjective;
58import org.onosproject.net.flowobjective.FlowObjectiveStore;
59import org.onosproject.net.flowobjective.ForwardingObjective;
60import org.onosproject.net.flowobjective.NextObjective;
61import org.onosproject.net.flowobjective.Objective;
62import org.onosproject.net.flowobjective.ObjectiveError;
63import org.onosproject.net.group.DefaultGroupBucket;
64import org.onosproject.net.group.DefaultGroupDescription;
65import org.onosproject.net.group.DefaultGroupKey;
66import org.onosproject.net.group.Group;
67import org.onosproject.net.group.GroupBucket;
68import org.onosproject.net.group.GroupBuckets;
69import org.onosproject.net.group.GroupDescription;
70import org.onosproject.net.group.GroupEvent;
71import org.onosproject.net.group.GroupKey;
72import org.onosproject.net.group.GroupListener;
73import org.onosproject.net.group.GroupService;
74import org.onosproject.store.serializers.KryoNamespaces;
75import org.onosproject.store.service.StorageService;
76import org.slf4j.Logger;
77
78import java.util.Collection;
79import java.util.Collections;
80import java.util.List;
81import java.util.Optional;
82import java.util.concurrent.TimeUnit;
83import java.util.stream.Collectors;
84import java.util.Iterator;
85
86
87import static org.slf4j.LoggerFactory.getLogger;
88
89/**
90 * Pipeliner for OLT device.
91 */
92
93public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
94
95 private static final Integer QQ_TABLE = 1;
96 private static final short MCAST_VLAN = 4000;
97 private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
ke hanb1fbddb2016-12-15 10:56:32 +080098 private static final int EAPOL_FLOW_PRIORITY = 1200;
ke hana1cb2512016-09-29 15:15:59 +080099 private final Logger log = getLogger(getClass());
100
101 private ServiceDirectory serviceDirectory;
102 private FlowRuleService flowRuleService;
103 private GroupService groupService;
104 private CoreService coreService;
105 private StorageService storageService;
106
107 private DeviceId deviceId;
108 private ApplicationId appId;
109
110
111 protected FlowObjectiveStore flowObjectiveStore;
112
113 private Cache<GroupKey, NextObjective> pendingGroups;
114
115 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
116 .register(KryoNamespaces.API)
117 .register(GroupKey.class)
118 .register(DefaultGroupKey.class)
119 .register(OLTPipelineGroup.class)
120 .build("OltPipeline");
121 @Override
122 public void init(DeviceId deviceId, PipelinerContext context) {
123 log.debug("Initiate OLT pipeline");
124 this.serviceDirectory = context.directory();
125 this.deviceId = deviceId;
126
127 flowRuleService = serviceDirectory.get(FlowRuleService.class);
128 coreService = serviceDirectory.get(CoreService.class);
129 groupService = serviceDirectory.get(GroupService.class);
130 flowObjectiveStore = context.store();
131 storageService = serviceDirectory.get(StorageService.class);
132
133 appId = coreService.registerApplication(
134 "org.onosproject.driver.OLTPipeline");
135
136
137 pendingGroups = CacheBuilder.newBuilder()
138 .expireAfterWrite(20, TimeUnit.SECONDS)
139 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
140 if (notification.getCause() == RemovalCause.EXPIRED) {
141 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
142 }
143 }).build();
144
145 groupService.addListener(new InnerGroupListener());
146
147 }
148
149 @Override
150 public void filter(FilteringObjective filter) {
151 Instructions.OutputInstruction output;
152
153 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
154 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
155 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
156 .limit(1)
157 .findFirst().get();
158
159 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
160 log.error("OLT can only filter packet to controller");
161 fail(filter, ObjectiveError.UNSUPPORTED);
162 return;
163 }
164 } else {
165 fail(filter, ObjectiveError.BADPARAMS);
166 return;
167 }
168
169 if (filter.key().type() != Criterion.Type.IN_PORT) {
170 fail(filter, ObjectiveError.BADPARAMS);
171 return;
172 }
173
174 EthTypeCriterion ethType = (EthTypeCriterion)
175 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
176
177 if (ethType == null) {
178 fail(filter, ObjectiveError.BADPARAMS);
179 return;
180 }
181
182 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
183 provisionEapol(filter, ethType, output);
184 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
185 IPProtocolCriterion ipProto = (IPProtocolCriterion)
186 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
187 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
188 provisionIgmp(filter, ethType, ipProto, output);
189 } else {
190 log.error("OLT can only filter igmp");
191 fail(filter, ObjectiveError.UNSUPPORTED);
192 }
193 } else {
194 log.error("OLT can only filter eapol and igmp");
195 fail(filter, ObjectiveError.UNSUPPORTED);
196 }
197
198 }
199
200 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
201 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
202 switch (objective.op()) {
203
204 case ADD:
205 flowBuilder.add(ruleBuilder.build());
206 break;
207 case REMOVE:
208 flowBuilder.remove(ruleBuilder.build());
209 break;
210 default:
211 log.warn("Unknown operation {}", objective.op());
212 }
213
214 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
215 @Override
216 public void onSuccess(FlowRuleOperations ops) {
217 objective.context().ifPresent(context -> context.onSuccess(objective));
218 }
219
220 @Override
221 public void onError(FlowRuleOperations ops) {
222 objective.context()
223 .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
224 }
225 }));
226 }
227
228 @Override
229 public void forward(ForwardingObjective fwd) {
230
231 if (checkForMulticast(fwd)) {
232 processMulticastRule(fwd);
233 return;
234 }
235
236 TrafficTreatment treatment = fwd.treatment();
237
238 List<Instruction> instructions = treatment.allInstructions();
239
240 Optional<Instruction> vlanIntruction = instructions.stream()
241 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
242 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
243 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
244 ((L2ModificationInstruction) i).subtype() ==
245 L2ModificationInstruction.L2SubType.VLAN_POP)
246 .findAny();
247
248 if (vlanIntruction.isPresent()) {
249 L2ModificationInstruction vlanIns =
250 (L2ModificationInstruction) vlanIntruction.get();
251
252 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
253 installUpstreamRules(fwd);
254 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
255 installDownstreamRules(fwd);
256 } else {
257 log.error("Unknown OLT operation: {}", fwd);
258 fail(fwd, ObjectiveError.UNSUPPORTED);
259 return;
260 }
261
262 pass(fwd);
263 } else {
264 TrafficSelector selector = fwd.selector();
265
266 if (fwd.treatment() != null) {
267 // Deal with SPECIFIC and VERSATILE in the same manner.
268 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
269 .forDevice(deviceId)
270 .withSelector(selector)
271 .fromApp(fwd.appId())
272 .withPriority(fwd.priority())
273 .withTreatment(fwd.treatment());
274
275 if (fwd.permanent()) {
276 ruleBuilder.makePermanent();
277 } else {
278 ruleBuilder.makeTemporary(fwd.timeout());
279 }
280 installObjective(ruleBuilder, fwd);
281
282 } else {
283 log.error("No treatment error: {}", fwd);
284 fail(fwd, ObjectiveError.UNSUPPORTED);
285 }
286 }
287
288 }
289
290
291 @Override
292 public void next(NextObjective nextObjective) {
293 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
294 log.error("OLT only supports broadcast groups.");
295 fail(nextObjective, ObjectiveError.BADPARAMS);
296 }
297
298 if (nextObjective.next().size() != 1) {
299 log.error("OLT only supports singleton broadcast groups.");
300 fail(nextObjective, ObjectiveError.BADPARAMS);
301 }
302
303 TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
304
305
306 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
307 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
308
309
310 pendingGroups.put(key, nextObjective);
311
312 switch (nextObjective.op()) {
313 case ADD:
314 GroupDescription groupDesc =
315 new DefaultGroupDescription(deviceId,
316 GroupDescription.Type.ALL,
317 new GroupBuckets(Collections.singletonList(bucket)),
318 key,
319 null,
320 nextObjective.appId());
321 groupService.addGroup(groupDesc);
322 break;
323 case REMOVE:
324 groupService.removeGroup(deviceId, key, nextObjective.appId());
325 break;
326 case ADD_TO_EXISTING:
327 groupService.addBucketsToGroup(deviceId, key,
328 new GroupBuckets(Collections.singletonList(bucket)),
329 key, nextObjective.appId());
330 break;
331 case REMOVE_FROM_EXISTING:
332 groupService.removeBucketsFromGroup(deviceId, key,
333 new GroupBuckets(Collections.singletonList(bucket)),
334 key, nextObjective.appId());
335 break;
336 default:
337 log.warn("Unknown next objective operation: {}", nextObjective.op());
338 }
339
340
341 }
342
343 private void processMulticastRule(ForwardingObjective fwd) {
344 if (fwd.nextId() == null) {
345 log.error("Multicast objective does not have a next id");
346 fail(fwd, ObjectiveError.BADPARAMS);
347 }
348
349 GroupKey key = getGroupForNextObjective(fwd.nextId());
350
351 if (key == null) {
352 log.error("Group for forwarding objective missing: {}", fwd);
353 fail(fwd, ObjectiveError.GROUPMISSING);
354 }
355
356 Group group = groupService.getGroup(deviceId, key);
357 TrafficTreatment treatment =
358 buildTreatment(Instructions.createGroup(group.id()));
359
360 FlowRule rule = DefaultFlowRule.builder()
361 .fromApp(fwd.appId())
362 .forDevice(deviceId)
363 .forTable(0)
364 .makePermanent()
365 .withPriority(fwd.priority())
366 .withSelector(fwd.selector())
367 .withTreatment(treatment)
368 .build();
369
370 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
371 switch (fwd.op()) {
372
373 case ADD:
374 builder.add(rule);
375 break;
376 case REMOVE:
377 builder.remove(rule);
378 break;
379 case ADD_TO_EXISTING:
380 case REMOVE_FROM_EXISTING:
381 break;
382 default:
383 log.warn("Unknown forwarding operation: {}", fwd.op());
384 }
385
386 applyFlowRules(builder, fwd);
387
388 }
389
390 private boolean checkForMulticast(ForwardingObjective fwd) {
391
392 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
393 Criterion.Type.IPV4_DST);
394
395 if (ip == null) {
396 return false;
397 }
398
399 return ip.ip().isMulticast();
400
401 }
402
403 private GroupKey getGroupForNextObjective(Integer nextId) {
404 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
405 return appKryo.deserialize(next.data());
406
407 }
408
409 private void installDownstreamRules(ForwardingObjective fwd) {
410 List<Pair<Instruction, Instruction>> vlanOps =
411 vlanOps(fwd,
412 L2ModificationInstruction.L2SubType.VLAN_POP);
413
414 if (vlanOps == null) {
415 return;
416 }
417
418 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
419
420 if (output == null) {
421 return;
422 }
423
424 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
425
426 TrafficSelector selector = fwd.selector();
427
428 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
429 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
430 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
431 Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
432
433 if (outerVlan == null || innerVlan == null || inport == null) {
434 log.error("Forwarding objective is underspecified: {}", fwd);
435 fail(fwd, ObjectiveError.BADPARAMS);
436 return;
437 }
438
439 Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
440
441 FlowRule.Builder outer = DefaultFlowRule.builder()
442 .fromApp(fwd.appId())
443 .forDevice(deviceId)
444 .makePermanent()
445 .withPriority(fwd.priority())
446 .withSelector(buildSelector(inport, outerVlan, bullshit))
447 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
448 Instructions.transition(QQ_TABLE)));
449
450 FlowRule.Builder inner = DefaultFlowRule.builder()
451 .fromApp(fwd.appId())
452 .forDevice(deviceId)
453 .forTable(QQ_TABLE)
454 .makePermanent()
455 .withPriority(fwd.priority())
456 .withSelector(buildSelector(inport, innerVid))
457 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
458 output));
459
460 applyRules(fwd, inner, outer);
461
462 }
463
464 private boolean hasUntaggedVlanTag(TrafficSelector selector) {
465 Iterator<Criterion> iter = selector.criteria().iterator();
466
467 while (iter.hasNext()) {
468 Criterion criterion = iter.next();
469 if (criterion.type() == Criterion.Type.VLAN_VID &&
470 ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
471 return true;
472 }
473 }
474
475 return false;
476 }
477
478 private void installUpstreamRules(ForwardingObjective fwd) {
479 List<Pair<Instruction, Instruction>> vlanOps =
480 vlanOps(fwd,
481 L2ModificationInstruction.L2SubType.VLAN_PUSH);
482 FlowRule.Builder inner;
483
484 if (vlanOps == null) {
485 return;
486 }
487
488 Instruction output = fetchOutput(fwd, "upstream");
489
490 if (output == null) {
491 return;
492 }
493
494 Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
495
496 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
497
498
499 if (hasUntaggedVlanTag(fwd.selector())) {
500 inner = DefaultFlowRule.builder()
501 .fromApp(fwd.appId())
502 .forDevice(deviceId)
503 .makePermanent()
504 .withPriority(fwd.priority())
505 .withSelector(fwd.selector())
506 .withTreatment(buildTreatment(innerPair.getLeft(),
507 innerPair.getRight(),
508 Instructions.transition(QQ_TABLE)));
509 } else {
510 inner = DefaultFlowRule.builder()
511 .fromApp(fwd.appId())
512 .forDevice(deviceId)
513 .makePermanent()
514 .withPriority(fwd.priority())
515 .withSelector(fwd.selector())
516 .withTreatment(buildTreatment(
517 innerPair.getRight(),
518 Instructions.transition(QQ_TABLE)));
519 }
520
521
522 PortCriterion inPort = (PortCriterion)
523 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
524
525 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
526 innerPair.getRight()).vlanId();
527
528 FlowRule.Builder outer = DefaultFlowRule.builder()
529 .fromApp(fwd.appId())
530 .forDevice(deviceId)
531 .forTable(QQ_TABLE)
532 .makePermanent()
533 .withPriority(fwd.priority())
534 .withSelector(buildSelector(inPort,
535 Criteria.matchVlanId(cVlanId)))
536 .withTreatment(buildTreatment(outerPair.getLeft(),
537 outerPair.getRight(),
538 output));
539
540 applyRules(fwd, inner, outer);
541
542 }
543
544 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
545 Instruction output = fwd.treatment().allInstructions().stream()
546 .filter(i -> i.type() == Instruction.Type.OUTPUT)
547 .findFirst().orElse(null);
548
549 if (output == null) {
550 log.error("OLT {} rule has no output", direction);
551 fail(fwd, ObjectiveError.BADPARAMS);
552 return null;
553 }
554 return output;
555 }
556
557 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
558 L2ModificationInstruction.L2SubType type) {
559
560 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
561 fwd.treatment().allInstructions(), type);
562
563 if (vlanOps == null) {
564 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
565 ? "downstream" : "upstream";
566 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
567 fail(fwd, ObjectiveError.BADPARAMS);
568 return null;
569 }
570 return vlanOps;
571 }
572
573
574 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
575 L2ModificationInstruction.L2SubType type) {
576
577 List<Instruction> vlanPushs = findL2Instructions(
578 type,
579 instructions);
580 List<Instruction> vlanSets = findL2Instructions(
581 L2ModificationInstruction.L2SubType.VLAN_ID,
582 instructions);
583
584 if (vlanPushs.size() != vlanSets.size()) {
585 return null;
586 }
587
588 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
589
590 for (int i = 0; i < vlanPushs.size(); i++) {
591 pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
592 }
593 return pairs;
594 }
595
596 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
597 List<Instruction> actions) {
598 return actions.stream()
599 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
600 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
601 .collect(Collectors.toList());
602 }
603
604 private void provisionEapol(FilteringObjective filter,
605 EthTypeCriterion ethType,
606 Instructions.OutputInstruction output) {
607
608 TrafficSelector selector = buildSelector(filter.key(), ethType);
609 TrafficTreatment treatment = buildTreatment(output);
ke hanb1fbddb2016-12-15 10:56:32 +0800610 buildAndApplyRule(filter, selector, treatment, EAPOL_FLOW_PRIORITY);
ke hana1cb2512016-09-29 15:15:59 +0800611
612 }
613
614 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
615 IPProtocolCriterion ipProto,
616 Instructions.OutputInstruction output) {
617 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
618 TrafficTreatment treatment = buildTreatment(output);
619 buildAndApplyRule(filter, selector, treatment);
620 }
621
622 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
623 TrafficTreatment treatment) {
ke hanb1fbddb2016-12-15 10:56:32 +0800624 buildAndApplyRule(filter, selector, treatment, filter.priority());
625 }
626
627 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
628 TrafficTreatment treatment, int priority) {
ke hana1cb2512016-09-29 15:15:59 +0800629 FlowRule rule = DefaultFlowRule.builder()
630 .fromApp(filter.appId())
631 .forDevice(deviceId)
632 .forTable(0)
633 .makePermanent()
634 .withSelector(selector)
635 .withTreatment(treatment)
ke hanb1fbddb2016-12-15 10:56:32 +0800636 .withPriority(priority)
ke hana1cb2512016-09-29 15:15:59 +0800637 .build();
638
639 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
640
641 switch (filter.type()) {
642 case PERMIT:
643 opsBuilder.add(rule);
644 break;
645 case DENY:
646 opsBuilder.remove(rule);
647 break;
648 default:
649 log.warn("Unknown filter type : {}", filter.type());
650 fail(filter, ObjectiveError.UNSUPPORTED);
651 }
652
653 applyFlowRules(opsBuilder, filter);
654 }
655
656 private void applyRules(ForwardingObjective fwd,
657 FlowRule.Builder inner, FlowRule.Builder outer) {
658 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
659 switch (fwd.op()) {
660 case ADD:
661 builder.add(inner.build()).add(outer.build());
662 break;
663 case REMOVE:
664 builder.remove(inner.build()).remove(outer.build());
665 break;
666 case ADD_TO_EXISTING:
667 break;
668 case REMOVE_FROM_EXISTING:
669 break;
670 default:
671 log.warn("Unknown forwarding operation: {}", fwd.op());
672 }
673
674 applyFlowRules(builder, fwd);
675 }
676
677 private void applyFlowRules(FlowRuleOperations.Builder builder,
678 Objective objective) {
679 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
680 @Override
681 public void onSuccess(FlowRuleOperations ops) {
682 pass(objective);
683 }
684
685 @Override
686 public void onError(FlowRuleOperations ops) {
687 fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
688 }
689 }));
690 }
691
692 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
693 return criteria.stream()
694 .filter(c -> c.type().equals(type))
695 .limit(1)
696 .findFirst().orElse(null);
697 }
698
699 private TrafficSelector buildSelector(Criterion... criteria) {
700
701
702 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
703
704 for (Criterion c : criteria) {
705 sBuilder.add(c);
706 }
707
708 return sBuilder.build();
709 }
710
711 private TrafficTreatment buildTreatment(Instruction... instructions) {
712
713
714 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
715
716 for (Instruction i : instructions) {
717 tBuilder.add(i);
718 }
719
720 return tBuilder.build();
721 }
722
723
724 private void fail(Objective obj, ObjectiveError error) {
725 obj.context().ifPresent(context -> context.onError(obj, error));
726 }
727
728 private void pass(Objective obj) {
729 obj.context().ifPresent(context -> context.onSuccess(obj));
730 }
731
732
733 private class InnerGroupListener implements GroupListener {
734 @Override
735 public void event(GroupEvent event) {
736 if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
737 GroupKey key = event.subject().appCookie();
738
739 NextObjective obj = pendingGroups.getIfPresent(key);
740 if (obj != null) {
741 flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
742 pass(obj);
743 pendingGroups.invalidate(key);
744 }
745 }
746 }
747 }
748
749 private static class OLTPipelineGroup implements NextGroup {
750
751 private final GroupKey key;
752
753 public OLTPipelineGroup(GroupKey key) {
754 this.key = key;
755 }
756
757 public GroupKey key() {
758 return key;
759 }
760
761 @Override
762 public byte[] data() {
763 return appKryo.serialize(key);
764 }
765
766 }
767
768 @Override
769 public List<String> getNextMappings(NextGroup nextGroup) {
770 // TODO Implementation deferred to vendor
771 return null;
772 }
773}