blob: 667d8cf087e645aa0933151300974c93d4f02e99 [file] [log] [blame]
ke hana1cb2512016-09-29 15:15:59 +08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
ke hana1cb2512016-09-29 15:15:59 +08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.driver.pipeline;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.Lists;
23import org.apache.commons.lang3.tuple.ImmutablePair;
24import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.packet.EthType;
27import org.onlab.packet.IPv4;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.driver.AbstractHandlerBehaviour;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleOperations;
43import org.onosproject.net.flow.FlowRuleOperationsContext;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.onosproject.net.flow.criteria.Criteria;
48import org.onosproject.net.flow.criteria.Criterion;
49import org.onosproject.net.flow.criteria.EthTypeCriterion;
50import org.onosproject.net.flow.criteria.IPCriterion;
51import org.onosproject.net.flow.criteria.IPProtocolCriterion;
52import org.onosproject.net.flow.criteria.PortCriterion;
53import org.onosproject.net.flow.criteria.VlanIdCriterion;
54import org.onosproject.net.flow.instructions.Instruction;
55import org.onosproject.net.flow.instructions.Instructions;
56import org.onosproject.net.flow.instructions.L2ModificationInstruction;
57import org.onosproject.net.flowobjective.FilteringObjective;
58import org.onosproject.net.flowobjective.FlowObjectiveStore;
59import org.onosproject.net.flowobjective.ForwardingObjective;
60import org.onosproject.net.flowobjective.NextObjective;
61import org.onosproject.net.flowobjective.Objective;
62import org.onosproject.net.flowobjective.ObjectiveError;
63import org.onosproject.net.group.DefaultGroupBucket;
64import org.onosproject.net.group.DefaultGroupDescription;
65import org.onosproject.net.group.DefaultGroupKey;
66import org.onosproject.net.group.Group;
67import org.onosproject.net.group.GroupBucket;
68import org.onosproject.net.group.GroupBuckets;
69import org.onosproject.net.group.GroupDescription;
70import org.onosproject.net.group.GroupEvent;
71import org.onosproject.net.group.GroupKey;
72import org.onosproject.net.group.GroupListener;
73import org.onosproject.net.group.GroupService;
74import org.onosproject.store.serializers.KryoNamespaces;
75import org.onosproject.store.service.StorageService;
76import org.slf4j.Logger;
77
78import java.util.Collection;
79import java.util.Collections;
80import java.util.List;
81import java.util.Optional;
82import java.util.concurrent.TimeUnit;
83import java.util.stream.Collectors;
84import java.util.Iterator;
85
86
87import static org.slf4j.LoggerFactory.getLogger;
88
89/**
90 * Pipeliner for OLT device.
91 */
92
93public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
94
95 private static final Integer QQ_TABLE = 1;
96 private static final short MCAST_VLAN = 4000;
97 private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
ke hanb1fbddb2016-12-15 10:56:32 +080098 private static final int EAPOL_FLOW_PRIORITY = 1200;
ke hana1cb2512016-09-29 15:15:59 +080099 private final Logger log = getLogger(getClass());
100
101 private ServiceDirectory serviceDirectory;
102 private FlowRuleService flowRuleService;
103 private GroupService groupService;
104 private CoreService coreService;
105 private StorageService storageService;
106
107 private DeviceId deviceId;
108 private ApplicationId appId;
109
110
111 protected FlowObjectiveStore flowObjectiveStore;
112
113 private Cache<GroupKey, NextObjective> pendingGroups;
114
115 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
116 .register(KryoNamespaces.API)
117 .register(GroupKey.class)
118 .register(DefaultGroupKey.class)
119 .register(OLTPipelineGroup.class)
120 .build("OltPipeline");
121 @Override
122 public void init(DeviceId deviceId, PipelinerContext context) {
123 log.debug("Initiate OLT pipeline");
124 this.serviceDirectory = context.directory();
125 this.deviceId = deviceId;
126
127 flowRuleService = serviceDirectory.get(FlowRuleService.class);
128 coreService = serviceDirectory.get(CoreService.class);
129 groupService = serviceDirectory.get(GroupService.class);
130 flowObjectiveStore = context.store();
131 storageService = serviceDirectory.get(StorageService.class);
132
133 appId = coreService.registerApplication(
134 "org.onosproject.driver.OLTPipeline");
135
136
137 pendingGroups = CacheBuilder.newBuilder()
138 .expireAfterWrite(20, TimeUnit.SECONDS)
139 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
140 if (notification.getCause() == RemovalCause.EXPIRED) {
141 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
142 }
143 }).build();
144
145 groupService.addListener(new InnerGroupListener());
146
147 }
148
149 @Override
150 public void filter(FilteringObjective filter) {
151 Instructions.OutputInstruction output;
152
153 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
154 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
155 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
156 .limit(1)
157 .findFirst().get();
158
159 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
160 log.error("OLT can only filter packet to controller");
161 fail(filter, ObjectiveError.UNSUPPORTED);
162 return;
163 }
164 } else {
165 fail(filter, ObjectiveError.BADPARAMS);
166 return;
167 }
168
169 if (filter.key().type() != Criterion.Type.IN_PORT) {
170 fail(filter, ObjectiveError.BADPARAMS);
171 return;
172 }
173
174 EthTypeCriterion ethType = (EthTypeCriterion)
175 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
176
177 if (ethType == null) {
178 fail(filter, ObjectiveError.BADPARAMS);
179 return;
180 }
181
182 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
183 provisionEapol(filter, ethType, output);
184 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
185 IPProtocolCriterion ipProto = (IPProtocolCriterion)
186 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
187 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
188 provisionIgmp(filter, ethType, ipProto, output);
189 } else {
190 log.error("OLT can only filter igmp");
191 fail(filter, ObjectiveError.UNSUPPORTED);
192 }
193 } else {
194 log.error("OLT can only filter eapol and igmp");
195 fail(filter, ObjectiveError.UNSUPPORTED);
196 }
197
198 }
199
200 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
201 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
202 switch (objective.op()) {
203
204 case ADD:
205 flowBuilder.add(ruleBuilder.build());
206 break;
207 case REMOVE:
208 flowBuilder.remove(ruleBuilder.build());
209 break;
210 default:
211 log.warn("Unknown operation {}", objective.op());
212 }
213
214 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
215 @Override
216 public void onSuccess(FlowRuleOperations ops) {
217 objective.context().ifPresent(context -> context.onSuccess(objective));
218 }
219
220 @Override
221 public void onError(FlowRuleOperations ops) {
222 objective.context()
223 .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
224 }
225 }));
226 }
227
228 @Override
229 public void forward(ForwardingObjective fwd) {
230
231 if (checkForMulticast(fwd)) {
232 processMulticastRule(fwd);
233 return;
234 }
235
ke han9649cdd2017-03-20 14:53:46 +0800236 if (checkForEAPOL(fwd)) {
237 log.warn("Discarding EAPOL flow which is not supported on this pipeline");
238 return;
239 }
240
ke hana1cb2512016-09-29 15:15:59 +0800241 TrafficTreatment treatment = fwd.treatment();
242
243 List<Instruction> instructions = treatment.allInstructions();
244
245 Optional<Instruction> vlanIntruction = instructions.stream()
246 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
247 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
248 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
249 ((L2ModificationInstruction) i).subtype() ==
250 L2ModificationInstruction.L2SubType.VLAN_POP)
251 .findAny();
252
253 if (vlanIntruction.isPresent()) {
254 L2ModificationInstruction vlanIns =
255 (L2ModificationInstruction) vlanIntruction.get();
256
257 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
258 installUpstreamRules(fwd);
259 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
260 installDownstreamRules(fwd);
261 } else {
262 log.error("Unknown OLT operation: {}", fwd);
263 fail(fwd, ObjectiveError.UNSUPPORTED);
264 return;
265 }
266
267 pass(fwd);
268 } else {
269 TrafficSelector selector = fwd.selector();
270
271 if (fwd.treatment() != null) {
272 // Deal with SPECIFIC and VERSATILE in the same manner.
273 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
274 .forDevice(deviceId)
275 .withSelector(selector)
276 .fromApp(fwd.appId())
277 .withPriority(fwd.priority())
278 .withTreatment(fwd.treatment());
279
280 if (fwd.permanent()) {
281 ruleBuilder.makePermanent();
282 } else {
283 ruleBuilder.makeTemporary(fwd.timeout());
284 }
285 installObjective(ruleBuilder, fwd);
286
287 } else {
288 log.error("No treatment error: {}", fwd);
289 fail(fwd, ObjectiveError.UNSUPPORTED);
290 }
291 }
292
293 }
294
295
296 @Override
297 public void next(NextObjective nextObjective) {
298 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
299 log.error("OLT only supports broadcast groups.");
300 fail(nextObjective, ObjectiveError.BADPARAMS);
301 }
302
303 if (nextObjective.next().size() != 1) {
304 log.error("OLT only supports singleton broadcast groups.");
305 fail(nextObjective, ObjectiveError.BADPARAMS);
306 }
307
308 TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
309
310
311 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
312 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
313
314
315 pendingGroups.put(key, nextObjective);
316
317 switch (nextObjective.op()) {
318 case ADD:
319 GroupDescription groupDesc =
320 new DefaultGroupDescription(deviceId,
321 GroupDescription.Type.ALL,
322 new GroupBuckets(Collections.singletonList(bucket)),
323 key,
324 null,
325 nextObjective.appId());
326 groupService.addGroup(groupDesc);
327 break;
328 case REMOVE:
329 groupService.removeGroup(deviceId, key, nextObjective.appId());
330 break;
331 case ADD_TO_EXISTING:
332 groupService.addBucketsToGroup(deviceId, key,
333 new GroupBuckets(Collections.singletonList(bucket)),
334 key, nextObjective.appId());
335 break;
336 case REMOVE_FROM_EXISTING:
337 groupService.removeBucketsFromGroup(deviceId, key,
338 new GroupBuckets(Collections.singletonList(bucket)),
339 key, nextObjective.appId());
340 break;
341 default:
342 log.warn("Unknown next objective operation: {}", nextObjective.op());
343 }
344
345
346 }
347
348 private void processMulticastRule(ForwardingObjective fwd) {
349 if (fwd.nextId() == null) {
350 log.error("Multicast objective does not have a next id");
351 fail(fwd, ObjectiveError.BADPARAMS);
352 }
353
354 GroupKey key = getGroupForNextObjective(fwd.nextId());
355
356 if (key == null) {
357 log.error("Group for forwarding objective missing: {}", fwd);
358 fail(fwd, ObjectiveError.GROUPMISSING);
359 }
360
361 Group group = groupService.getGroup(deviceId, key);
362 TrafficTreatment treatment =
363 buildTreatment(Instructions.createGroup(group.id()));
364
365 FlowRule rule = DefaultFlowRule.builder()
366 .fromApp(fwd.appId())
367 .forDevice(deviceId)
368 .forTable(0)
369 .makePermanent()
370 .withPriority(fwd.priority())
371 .withSelector(fwd.selector())
372 .withTreatment(treatment)
373 .build();
374
375 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
376 switch (fwd.op()) {
377
378 case ADD:
379 builder.add(rule);
380 break;
381 case REMOVE:
382 builder.remove(rule);
383 break;
384 case ADD_TO_EXISTING:
385 case REMOVE_FROM_EXISTING:
386 break;
387 default:
388 log.warn("Unknown forwarding operation: {}", fwd.op());
389 }
390
391 applyFlowRules(builder, fwd);
392
393 }
394
395 private boolean checkForMulticast(ForwardingObjective fwd) {
396
397 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
398 Criterion.Type.IPV4_DST);
399
400 if (ip == null) {
401 return false;
402 }
403
404 return ip.ip().isMulticast();
405
406 }
407
ke han9649cdd2017-03-20 14:53:46 +0800408 private boolean checkForEAPOL(ForwardingObjective fwd) {
409 EthTypeCriterion ethType = (EthTypeCriterion)
410 filterForCriterion(fwd.selector().criteria(), Criterion.Type.ETH_TYPE);
411
412 return ethType != null && ethType.ethType().equals(EthType.EtherType.EAPOL.ethType());
413 }
ke hana1cb2512016-09-29 15:15:59 +0800414 private GroupKey getGroupForNextObjective(Integer nextId) {
415 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
416 return appKryo.deserialize(next.data());
417
418 }
419
420 private void installDownstreamRules(ForwardingObjective fwd) {
421 List<Pair<Instruction, Instruction>> vlanOps =
422 vlanOps(fwd,
423 L2ModificationInstruction.L2SubType.VLAN_POP);
424
425 if (vlanOps == null) {
426 return;
427 }
428
429 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
430
431 if (output == null) {
432 return;
433 }
434
435 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
436
437 TrafficSelector selector = fwd.selector();
438
439 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
440 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
441 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
442 Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
443
444 if (outerVlan == null || innerVlan == null || inport == null) {
445 log.error("Forwarding objective is underspecified: {}", fwd);
446 fail(fwd, ObjectiveError.BADPARAMS);
447 return;
448 }
449
450 Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
451
452 FlowRule.Builder outer = DefaultFlowRule.builder()
453 .fromApp(fwd.appId())
454 .forDevice(deviceId)
455 .makePermanent()
456 .withPriority(fwd.priority())
457 .withSelector(buildSelector(inport, outerVlan, bullshit))
458 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
459 Instructions.transition(QQ_TABLE)));
460
461 FlowRule.Builder inner = DefaultFlowRule.builder()
462 .fromApp(fwd.appId())
463 .forDevice(deviceId)
464 .forTable(QQ_TABLE)
465 .makePermanent()
466 .withPriority(fwd.priority())
467 .withSelector(buildSelector(inport, innerVid))
468 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
469 output));
470
471 applyRules(fwd, inner, outer);
472
473 }
474
475 private boolean hasUntaggedVlanTag(TrafficSelector selector) {
476 Iterator<Criterion> iter = selector.criteria().iterator();
477
478 while (iter.hasNext()) {
479 Criterion criterion = iter.next();
480 if (criterion.type() == Criterion.Type.VLAN_VID &&
481 ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
482 return true;
483 }
484 }
485
486 return false;
487 }
488
489 private void installUpstreamRules(ForwardingObjective fwd) {
490 List<Pair<Instruction, Instruction>> vlanOps =
491 vlanOps(fwd,
492 L2ModificationInstruction.L2SubType.VLAN_PUSH);
493 FlowRule.Builder inner;
494
495 if (vlanOps == null) {
496 return;
497 }
498
499 Instruction output = fetchOutput(fwd, "upstream");
500
501 if (output == null) {
502 return;
503 }
504
505 Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
506
507 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
508
509
510 if (hasUntaggedVlanTag(fwd.selector())) {
511 inner = DefaultFlowRule.builder()
512 .fromApp(fwd.appId())
513 .forDevice(deviceId)
514 .makePermanent()
515 .withPriority(fwd.priority())
516 .withSelector(fwd.selector())
517 .withTreatment(buildTreatment(innerPair.getLeft(),
518 innerPair.getRight(),
519 Instructions.transition(QQ_TABLE)));
520 } else {
521 inner = DefaultFlowRule.builder()
522 .fromApp(fwd.appId())
523 .forDevice(deviceId)
524 .makePermanent()
525 .withPriority(fwd.priority())
526 .withSelector(fwd.selector())
527 .withTreatment(buildTreatment(
528 innerPair.getRight(),
529 Instructions.transition(QQ_TABLE)));
530 }
531
532
533 PortCriterion inPort = (PortCriterion)
534 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
535
536 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
537 innerPair.getRight()).vlanId();
538
539 FlowRule.Builder outer = DefaultFlowRule.builder()
540 .fromApp(fwd.appId())
541 .forDevice(deviceId)
542 .forTable(QQ_TABLE)
543 .makePermanent()
544 .withPriority(fwd.priority())
545 .withSelector(buildSelector(inPort,
546 Criteria.matchVlanId(cVlanId)))
547 .withTreatment(buildTreatment(outerPair.getLeft(),
548 outerPair.getRight(),
549 output));
550
551 applyRules(fwd, inner, outer);
552
553 }
554
555 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
556 Instruction output = fwd.treatment().allInstructions().stream()
557 .filter(i -> i.type() == Instruction.Type.OUTPUT)
558 .findFirst().orElse(null);
559
560 if (output == null) {
561 log.error("OLT {} rule has no output", direction);
562 fail(fwd, ObjectiveError.BADPARAMS);
563 return null;
564 }
565 return output;
566 }
567
568 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
569 L2ModificationInstruction.L2SubType type) {
570
571 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
572 fwd.treatment().allInstructions(), type);
573
574 if (vlanOps == null) {
575 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
576 ? "downstream" : "upstream";
577 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
578 fail(fwd, ObjectiveError.BADPARAMS);
579 return null;
580 }
581 return vlanOps;
582 }
583
584
585 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
586 L2ModificationInstruction.L2SubType type) {
587
588 List<Instruction> vlanPushs = findL2Instructions(
589 type,
590 instructions);
591 List<Instruction> vlanSets = findL2Instructions(
592 L2ModificationInstruction.L2SubType.VLAN_ID,
593 instructions);
594
595 if (vlanPushs.size() != vlanSets.size()) {
596 return null;
597 }
598
599 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
600
601 for (int i = 0; i < vlanPushs.size(); i++) {
602 pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
603 }
604 return pairs;
605 }
606
607 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
608 List<Instruction> actions) {
609 return actions.stream()
610 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
611 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
612 .collect(Collectors.toList());
613 }
614
615 private void provisionEapol(FilteringObjective filter,
616 EthTypeCriterion ethType,
617 Instructions.OutputInstruction output) {
618
619 TrafficSelector selector = buildSelector(filter.key(), ethType);
620 TrafficTreatment treatment = buildTreatment(output);
ke hanb1fbddb2016-12-15 10:56:32 +0800621 buildAndApplyRule(filter, selector, treatment, EAPOL_FLOW_PRIORITY);
ke hana1cb2512016-09-29 15:15:59 +0800622
623 }
624
625 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
626 IPProtocolCriterion ipProto,
627 Instructions.OutputInstruction output) {
628 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
629 TrafficTreatment treatment = buildTreatment(output);
630 buildAndApplyRule(filter, selector, treatment);
631 }
632
633 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
634 TrafficTreatment treatment) {
ke hanb1fbddb2016-12-15 10:56:32 +0800635 buildAndApplyRule(filter, selector, treatment, filter.priority());
636 }
637
638 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
639 TrafficTreatment treatment, int priority) {
ke hana1cb2512016-09-29 15:15:59 +0800640 FlowRule rule = DefaultFlowRule.builder()
641 .fromApp(filter.appId())
642 .forDevice(deviceId)
643 .forTable(0)
644 .makePermanent()
645 .withSelector(selector)
646 .withTreatment(treatment)
ke hanb1fbddb2016-12-15 10:56:32 +0800647 .withPriority(priority)
ke hana1cb2512016-09-29 15:15:59 +0800648 .build();
649
650 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
651
652 switch (filter.type()) {
653 case PERMIT:
654 opsBuilder.add(rule);
655 break;
656 case DENY:
657 opsBuilder.remove(rule);
658 break;
659 default:
660 log.warn("Unknown filter type : {}", filter.type());
661 fail(filter, ObjectiveError.UNSUPPORTED);
662 }
663
664 applyFlowRules(opsBuilder, filter);
665 }
666
667 private void applyRules(ForwardingObjective fwd,
668 FlowRule.Builder inner, FlowRule.Builder outer) {
669 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
670 switch (fwd.op()) {
671 case ADD:
672 builder.add(inner.build()).add(outer.build());
673 break;
674 case REMOVE:
675 builder.remove(inner.build()).remove(outer.build());
676 break;
677 case ADD_TO_EXISTING:
678 break;
679 case REMOVE_FROM_EXISTING:
680 break;
681 default:
682 log.warn("Unknown forwarding operation: {}", fwd.op());
683 }
684
685 applyFlowRules(builder, fwd);
686 }
687
688 private void applyFlowRules(FlowRuleOperations.Builder builder,
689 Objective objective) {
690 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
691 @Override
692 public void onSuccess(FlowRuleOperations ops) {
693 pass(objective);
694 }
695
696 @Override
697 public void onError(FlowRuleOperations ops) {
698 fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
699 }
700 }));
701 }
702
703 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
704 return criteria.stream()
705 .filter(c -> c.type().equals(type))
706 .limit(1)
707 .findFirst().orElse(null);
708 }
709
710 private TrafficSelector buildSelector(Criterion... criteria) {
711
712
713 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
714
715 for (Criterion c : criteria) {
716 sBuilder.add(c);
717 }
718
719 return sBuilder.build();
720 }
721
722 private TrafficTreatment buildTreatment(Instruction... instructions) {
723
724
725 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
726
727 for (Instruction i : instructions) {
728 tBuilder.add(i);
729 }
730
731 return tBuilder.build();
732 }
733
734
735 private void fail(Objective obj, ObjectiveError error) {
736 obj.context().ifPresent(context -> context.onError(obj, error));
737 }
738
739 private void pass(Objective obj) {
740 obj.context().ifPresent(context -> context.onSuccess(obj));
741 }
742
743
744 private class InnerGroupListener implements GroupListener {
745 @Override
746 public void event(GroupEvent event) {
747 if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
748 GroupKey key = event.subject().appCookie();
749
750 NextObjective obj = pendingGroups.getIfPresent(key);
751 if (obj != null) {
752 flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
753 pass(obj);
754 pendingGroups.invalidate(key);
755 }
756 }
757 }
758 }
759
760 private static class OLTPipelineGroup implements NextGroup {
761
762 private final GroupKey key;
763
764 public OLTPipelineGroup(GroupKey key) {
765 this.key = key;
766 }
767
768 public GroupKey key() {
769 return key;
770 }
771
772 @Override
773 public byte[] data() {
774 return appKryo.serialize(key);
775 }
776
777 }
778
779 @Override
780 public List<String> getNextMappings(NextGroup nextGroup) {
781 // TODO Implementation deferred to vendor
782 return null;
783 }
784}