blob: 8e91a998a09e4dddc0ecb6952da32906e938889d [file] [log] [blame]
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.driver.pipeline;
17
18import static org.onlab.util.Tools.groupedThreads;
19import static org.slf4j.LoggerFactory.getLogger;
20
21import com.google.common.cache.Cache;
22import com.google.common.cache.CacheBuilder;
23import com.google.common.cache.RemovalCause;
24import com.google.common.cache.RemovalNotification;
25
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.Ethernet;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.driver.AbstractHandlerBehaviour;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleOperations;
43import org.onosproject.net.flow.FlowRuleOperationsContext;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.onosproject.net.flow.criteria.Criteria;
48import org.onosproject.net.flow.criteria.Criterion;
49import org.onosproject.net.flow.criteria.EthCriterion;
50import org.onosproject.net.flow.criteria.EthTypeCriterion;
51import org.onosproject.net.flow.criteria.IPCriterion;
52import org.onosproject.net.flow.criteria.MplsCriterion;
53import org.onosproject.net.flow.criteria.PortCriterion;
54import org.onosproject.net.flow.criteria.VlanIdCriterion;
55import org.onosproject.net.flow.instructions.Instruction;
56import org.onosproject.net.flowobjective.FilteringObjective;
57import org.onosproject.net.flowobjective.FlowObjectiveStore;
58import org.onosproject.net.flowobjective.ForwardingObjective;
59import org.onosproject.net.flowobjective.NextObjective;
60import org.onosproject.net.flowobjective.Objective;
61import org.onosproject.net.flowobjective.ObjectiveError;
62import org.onosproject.net.group.DefaultGroupBucket;
63import org.onosproject.net.group.DefaultGroupDescription;
64import org.onosproject.net.group.DefaultGroupKey;
65import org.onosproject.net.group.Group;
66import org.onosproject.net.group.GroupBucket;
67import org.onosproject.net.group.GroupBuckets;
68import org.onosproject.net.group.GroupDescription;
69import org.onosproject.net.group.GroupEvent;
70import org.onosproject.net.group.GroupKey;
71import org.onosproject.net.group.GroupListener;
72import org.onosproject.net.group.GroupService;
73import org.slf4j.Logger;
74
75import java.util.ArrayList;
sangho834e4b02015-05-01 09:38:25 -070076import java.util.Arrays;
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -070077import java.util.Collection;
78import java.util.Collections;
79import java.util.List;
80import java.util.Set;
81import java.util.concurrent.Executors;
82import java.util.concurrent.ScheduledExecutorService;
83import java.util.concurrent.TimeUnit;
84import java.util.stream.Collectors;
85
86/**
87 * Driver for SPRING-OPEN pipeline.
88 */
89public class SpringOpenTTP extends AbstractHandlerBehaviour
90 implements Pipeliner {
91
92 // Default table ID - compatible with CpqD switch
93 private static final int TABLE_VLAN = 0;
94 private static final int TABLE_TMAC = 1;
95 private static final int TABLE_IPV4_UNICAST = 2;
96 private static final int TABLE_MPLS = 3;
97 private static final int TABLE_ACL = 5;
98
99 /**
100 * Set the default values. These variables will get overwritten based on the
101 * switch vendor type
102 */
103 protected int vlanTableId = TABLE_VLAN;
104 protected int tmacTableId = TABLE_TMAC;
105 protected int ipv4UnicastTableId = TABLE_IPV4_UNICAST;
106 protected int mplsTableId = TABLE_MPLS;
107 protected int aclTableId = TABLE_ACL;
108
109 protected final Logger log = getLogger(getClass());
110
111 private ServiceDirectory serviceDirectory;
112 private FlowRuleService flowRuleService;
113 private CoreService coreService;
114 protected GroupService groupService;
115 protected FlowObjectiveStore flowObjectiveStore;
116 protected DeviceId deviceId;
117 private ApplicationId appId;
118
119 private Cache<GroupKey, NextObjective> pendingGroups;
120
121 private ScheduledExecutorService groupChecker = Executors
122 .newScheduledThreadPool(2,
123 groupedThreads("onos/pipeliner",
124 "spring-open-%d"));
125 protected KryoNamespace appKryo = new KryoNamespace.Builder()
126 .register(GroupKey.class).register(DefaultGroupKey.class)
127 .register(SegmentRoutingGroup.class).register(byte[].class).build();
128
129 @Override
130 public void init(DeviceId deviceId, PipelinerContext context) {
131 this.serviceDirectory = context.directory();
132 this.deviceId = deviceId;
133
134 pendingGroups = CacheBuilder
135 .newBuilder()
136 .expireAfterWrite(20, TimeUnit.SECONDS)
137 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
138 if (notification.getCause() == RemovalCause.EXPIRED) {
139 fail(notification.getValue(),
140 ObjectiveError.GROUPINSTALLATIONFAILED);
141 }
142 }).build();
143
144 groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500,
145 TimeUnit.MILLISECONDS);
146
147 coreService = serviceDirectory.get(CoreService.class);
148 flowRuleService = serviceDirectory.get(FlowRuleService.class);
149 groupService = serviceDirectory.get(GroupService.class);
150 flowObjectiveStore = context.store();
151
152 groupService.addListener(new InnerGroupListener());
153
154 appId = coreService
155 .registerApplication("org.onosproject.driver.SpringOpenTTP");
156
157 setTableMissEntries();
158 log.info("Spring Open TTP driver initialized");
159 }
160
161 @Override
162 public void filter(FilteringObjective filteringObjective) {
163 if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
164 log.debug("processing PERMIT filter objective");
165 processFilter(filteringObjective,
166 filteringObjective.op() == Objective.Operation.ADD,
167 filteringObjective.appId());
168 } else {
169 log.debug("filter objective other than PERMIT not supported");
170 fail(filteringObjective, ObjectiveError.UNSUPPORTED);
171 }
172 }
173
174 @Override
175 public void forward(ForwardingObjective fwd) {
176 Collection<FlowRule> rules;
177 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
178
179 rules = processForward(fwd);
180 switch (fwd.op()) {
181 case ADD:
182 rules.stream().filter(rule -> rule != null)
183 .forEach(flowBuilder::add);
184 break;
185 case REMOVE:
186 rules.stream().filter(rule -> rule != null)
187 .forEach(flowBuilder::remove);
188 break;
189 default:
190 fail(fwd, ObjectiveError.UNKNOWN);
191 log.warn("Unknown forwarding type {}", fwd.op());
192 }
193
194 flowRuleService.apply(flowBuilder
195 .build(new FlowRuleOperationsContext() {
196 @Override
197 public void onSuccess(FlowRuleOperations ops) {
198 pass(fwd);
199 }
200
201 @Override
202 public void onError(FlowRuleOperations ops) {
203 fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
204 }
205 }));
206
207 }
208
209 @Override
210 public void next(NextObjective nextObjective) {
sangho834e4b02015-05-01 09:38:25 -0700211
212 if (nextObjective.op() == Objective.Operation.REMOVE) {
213 if (nextObjective.next() == null) {
214 removeGroup(nextObjective);
215 } else {
216 removeBucketFromGroup(nextObjective);
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -0700217 }
sangho834e4b02015-05-01 09:38:25 -0700218 } else if (nextObjective.op() == Objective.Operation.ADD) {
219 NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
220 if (nextGroup != null) {
221 addBucketToGroup(nextObjective);
222 } else {
223 addGroup(nextObjective);
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -0700224 }
sangho834e4b02015-05-01 09:38:25 -0700225 } else {
226 log.warn("Unsupported operation {}", nextObjective.op());
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -0700227 }
228
229 }
230
sangho834e4b02015-05-01 09:38:25 -0700231 private void removeGroup(NextObjective nextObjective) {
232 final GroupKey key = new DefaultGroupKey(
233 appKryo.serialize(nextObjective.id()));
234 groupService.removeGroup(deviceId, key, appId);
235 }
236
237 private void addGroup(NextObjective nextObjective) {
238 switch (nextObjective.type()) {
239 case SIMPLE:
240 log.debug("processing SIMPLE next objective");
241 Collection<TrafficTreatment> treatments = nextObjective.next();
242 if (treatments.size() == 1) {
243 TrafficTreatment treatment = treatments.iterator().next();
244 GroupBucket bucket = DefaultGroupBucket
245 .createIndirectGroupBucket(treatment);
246 final GroupKey key = new DefaultGroupKey(
247 appKryo.serialize(nextObjective
248 .id()));
249 GroupDescription groupDescription = new DefaultGroupDescription(
250 deviceId,
251 GroupDescription.Type.INDIRECT,
252 new GroupBuckets(
253 Collections.singletonList(bucket)),
254 key,
255 nextObjective.appId());
256 groupService.addGroup(groupDescription);
257 pendingGroups.put(key, nextObjective);
258 }
259 break;
260 case HASHED:
261 log.debug("processing HASHED next objective");
262 List<GroupBucket> buckets = nextObjective
263 .next()
264 .stream()
265 .map((treatment) -> DefaultGroupBucket
266 .createSelectGroupBucket(treatment))
267 .collect(Collectors.toList());
268 if (!buckets.isEmpty()) {
269 final GroupKey key = new DefaultGroupKey(
270 appKryo.serialize(nextObjective
271 .id()));
272 GroupDescription groupDescription = new DefaultGroupDescription(
273 deviceId,
274 GroupDescription.Type.SELECT,
275 new GroupBuckets(buckets),
276 key,
277 nextObjective.appId());
278 groupService.addGroup(groupDescription);
279 pendingGroups.put(key, nextObjective);
280 }
281 break;
282 case BROADCAST:
283 case FAILOVER:
284 log.debug("BROADCAST and FAILOVER next objectives not supported");
285 fail(nextObjective, ObjectiveError.UNSUPPORTED);
286 log.warn("Unsupported next objective type {}", nextObjective.type());
287 break;
288 default:
289 fail(nextObjective, ObjectiveError.UNKNOWN);
290 log.warn("Unknown next objective type {}", nextObjective.type());
291 }
292 }
293
294 private void addBucketToGroup(NextObjective nextObjective) {
295 Collection<TrafficTreatment> treatments = nextObjective.next();
296 TrafficTreatment treatment = treatments.iterator().next();
297 final GroupKey key = new DefaultGroupKey(
298 appKryo.serialize(nextObjective
299 .id()));
300 Group group = groupService.getGroup(deviceId, key);
301 if (group == null) {
302 log.warn("Group is not found in {} for {}", deviceId, key);
303 return;
304 }
305 GroupBucket bucket;
306 if (group.type() == GroupDescription.Type.INDIRECT) {
307 bucket = DefaultGroupBucket.createIndirectGroupBucket(treatment);
308 } else if (group.type() == GroupDescription.Type.SELECT) {
309 bucket = DefaultGroupBucket.createSelectGroupBucket(treatment);
310 } else {
311 log.warn("Unsupported Group type {}", group.type());
312 return;
313 }
314 GroupBuckets bucketsToAdd = new GroupBuckets(Arrays.asList(bucket));
315 groupService.addBucketsToGroup(deviceId, key, bucketsToAdd, key, appId);
316 }
317
318 private void removeBucketFromGroup(NextObjective nextObjective) {
319 NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
320 if (nextGroup != null) {
321 Collection<TrafficTreatment> treatments = nextObjective.next();
322 TrafficTreatment treatment = treatments.iterator().next();
323 final GroupKey key = new DefaultGroupKey(
324 appKryo.serialize(nextObjective
325 .id()));
326 Group group = groupService.getGroup(deviceId, key);
327 if (group == null) {
328 log.warn("Group is not found in {} for {}", deviceId, key);
329 return;
330 }
331 GroupBucket bucket;
332 if (group.type() == GroupDescription.Type.INDIRECT) {
333 bucket = DefaultGroupBucket.createIndirectGroupBucket(treatment);
334 } else if (group.type() == GroupDescription.Type.SELECT) {
335 bucket = DefaultGroupBucket.createSelectGroupBucket(treatment);
336 } else {
337 log.warn("Unsupported Group type {}", group.type());
338 return;
339 }
340 GroupBuckets removeBuckets = new GroupBuckets(Arrays.asList(bucket));
341 groupService.removeBucketsFromGroup(deviceId, key, removeBuckets, key, appId);
342 }
343 }
344
Srikanth Vavilapallif5b234a2015-04-21 13:04:13 -0700345 private Collection<FlowRule> processForward(ForwardingObjective fwd) {
346 switch (fwd.flag()) {
347 case SPECIFIC:
348 return processSpecific(fwd);
349 case VERSATILE:
350 return processVersatile(fwd);
351 default:
352 fail(fwd, ObjectiveError.UNKNOWN);
353 log.warn("Unknown forwarding flag {}", fwd.flag());
354 }
355 return Collections.emptySet();
356 }
357
358 private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
359 fail(fwd, ObjectiveError.UNSUPPORTED);
360 return Collections.emptySet();
361 }
362
363 protected Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
364 log.debug("Processing specific");
365 TrafficSelector selector = fwd.selector();
366 EthTypeCriterion ethType = (EthTypeCriterion) selector
367 .getCriterion(Criterion.Type.ETH_TYPE);
368 if ((ethType == null) ||
369 ((((short) ethType.ethType()) != Ethernet.TYPE_IPV4) &&
370 (((short) ethType.ethType()) != Ethernet.MPLS_UNICAST))) {
371 log.debug("processSpecific: Unsupported "
372 + "forwarding objective criteraia");
373 fail(fwd, ObjectiveError.UNSUPPORTED);
374 return Collections.emptySet();
375 }
376
377 TrafficSelector.Builder filteredSelectorBuilder =
378 DefaultTrafficSelector.builder();
379 int forTableId = -1;
380 if (((short) ethType.ethType()) == Ethernet.TYPE_IPV4) {
381 filteredSelectorBuilder = filteredSelectorBuilder
382 .matchEthType(Ethernet.TYPE_IPV4)
383 .matchIPDst(((IPCriterion) selector
384 .getCriterion(Criterion.Type.IPV4_DST))
385 .ip());
386 forTableId = ipv4UnicastTableId;
387 log.debug("processing IPv4 specific forwarding objective");
388 } else {
389 filteredSelectorBuilder = filteredSelectorBuilder
390 .matchEthType(Ethernet.MPLS_UNICAST)
391 .matchMplsLabel(((MplsCriterion)
392 selector.getCriterion(Criterion.Type.MPLS_LABEL)).label());
393 //TODO: Add Match for BoS
394 //if (selector.getCriterion(Criterion.Type.MPLS_BOS) != null) {
395 //}
396 forTableId = mplsTableId;
397 log.debug("processing MPLS specific forwarding objective");
398 }
399
400 TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment
401 .builder();
402 if (fwd.treatment() != null) {
403 for (Instruction i : fwd.treatment().allInstructions()) {
404 treatmentBuilder.add(i);
405 }
406 }
407
408 //TODO: Analyze the forwarding objective here to make
409 //device specific decision such as no ECMP groups in Dell
410 //switches.
411 if (fwd.nextId() != null) {
412 NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
413
414 if (next != null) {
415 GroupKey key = appKryo.deserialize(next.data());
416
417 Group group = groupService.getGroup(deviceId, key);
418
419 if (group == null) {
420 log.warn("The group left!");
421 fail(fwd, ObjectiveError.GROUPMISSING);
422 return Collections.emptySet();
423 }
424 treatmentBuilder.group(group.id());
425 log.debug("Adding OUTGROUP action");
426 }
427 }
428
429 TrafficSelector filteredSelector = filteredSelectorBuilder.build();
430 TrafficTreatment treatment = treatmentBuilder.transition(aclTableId)
431 .build();
432
433 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
434 .fromApp(fwd.appId()).withPriority(fwd.priority())
435 .forDevice(deviceId).withSelector(filteredSelector)
436 .withTreatment(treatment);
437
438 if (fwd.permanent()) {
439 ruleBuilder.makePermanent();
440 } else {
441 ruleBuilder.makeTemporary(fwd.timeout());
442 }
443
444 ruleBuilder.forTable(forTableId);
445 return Collections.singletonList(ruleBuilder.build());
446
447 }
448
449 protected List<FlowRule> processEthDstFilter(Criterion c,
450 FilteringObjective filt,
451 ApplicationId applicationId) {
452 List<FlowRule> rules = new ArrayList<FlowRule>();
453 EthCriterion e = (EthCriterion) c;
454 TrafficSelector.Builder selectorIp = DefaultTrafficSelector
455 .builder();
456 TrafficTreatment.Builder treatmentIp = DefaultTrafficTreatment
457 .builder();
458 selectorIp.matchEthDst(e.mac());
459 selectorIp.matchEthType(Ethernet.TYPE_IPV4);
460 treatmentIp.transition(ipv4UnicastTableId);
461 FlowRule ruleIp = DefaultFlowRule.builder().forDevice(deviceId)
462 .withSelector(selectorIp.build())
463 .withTreatment(treatmentIp.build())
464 .withPriority(filt.priority()).fromApp(applicationId)
465 .makePermanent().forTable(tmacTableId).build();
466 log.debug("adding IP ETH rule for MAC: {}", e.mac());
467 rules.add(ruleIp);
468
469 TrafficSelector.Builder selectorMpls = DefaultTrafficSelector
470 .builder();
471 TrafficTreatment.Builder treatmentMpls = DefaultTrafficTreatment
472 .builder();
473 selectorMpls.matchEthDst(e.mac());
474 selectorMpls.matchEthType(Ethernet.MPLS_UNICAST);
475 treatmentMpls.transition(mplsTableId);
476 FlowRule ruleMpls = DefaultFlowRule.builder()
477 .forDevice(deviceId).withSelector(selectorMpls.build())
478 .withTreatment(treatmentMpls.build())
479 .withPriority(filt.priority()).fromApp(applicationId)
480 .makePermanent().forTable(tmacTableId).build();
481 log.debug("adding MPLS ETH rule for MAC: {}", e.mac());
482 rules.add(ruleMpls);
483
484 return rules;
485 }
486
487 private void processFilter(FilteringObjective filt, boolean install,
488 ApplicationId applicationId) {
489 // This driver only processes filtering criteria defined with switch
490 // ports as the key
491 PortCriterion p;
492 if (!filt.key().equals(Criteria.dummy())
493 && filt.key().type() == Criterion.Type.IN_PORT) {
494 p = (PortCriterion) filt.key();
495 } else {
496 log.warn("No key defined in filtering objective from app: {}. Not"
497 + "processing filtering objective", applicationId);
498 fail(filt, ObjectiveError.UNKNOWN);
499 return;
500 }
501 // convert filtering conditions for switch-intfs into flowrules
502 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
503 for (Criterion c : filt.conditions()) {
504 if (c.type() == Criterion.Type.ETH_DST) {
505 for (FlowRule rule : processEthDstFilter(c,
506 filt,
507 applicationId)) {
508 ops = install ? ops.add(rule) : ops.remove(rule);
509 }
510 } else if (c.type() == Criterion.Type.VLAN_VID) {
511 VlanIdCriterion v = (VlanIdCriterion) c;
512 log.debug("adding rule for VLAN: {}", v.vlanId());
513 TrafficSelector.Builder selector = DefaultTrafficSelector
514 .builder();
515 TrafficTreatment.Builder treatment = DefaultTrafficTreatment
516 .builder();
517 if (v.vlanId() != VlanId.NONE) {
518 selector.matchVlanId(v.vlanId());
519 selector.matchInPort(p.port());
520 treatment.deferred().popVlan();
521 }
522 treatment.transition(tmacTableId);
523 FlowRule rule = DefaultFlowRule.builder().forDevice(deviceId)
524 .withSelector(selector.build())
525 .withTreatment(treatment.build())
526 .withPriority(filt.priority()).fromApp(applicationId)
527 .makePermanent().forTable(vlanTableId).build();
528 ops = install ? ops.add(rule) : ops.remove(rule);
529 } else if (c.type() == Criterion.Type.IPV4_DST) {
530 IPCriterion ip = (IPCriterion) c;
531 log.debug("adding rule for IP: {}", ip.ip());
532 TrafficSelector.Builder selector = DefaultTrafficSelector
533 .builder();
534 TrafficTreatment.Builder treatment = DefaultTrafficTreatment
535 .builder();
536 selector.matchEthType(Ethernet.TYPE_IPV4);
537 selector.matchIPDst(ip.ip());
538 treatment.transition(aclTableId);
539 FlowRule rule = DefaultFlowRule.builder().forDevice(deviceId)
540 .withSelector(selector.build())
541 .withTreatment(treatment.build())
542 .withPriority(filt.priority()).fromApp(applicationId)
543 .makePermanent().forTable(ipv4UnicastTableId).build();
544 ops = install ? ops.add(rule) : ops.remove(rule);
545 } else {
546 log.warn("Driver does not currently process filtering condition"
547 + " of type: {}", c.type());
548 fail(filt, ObjectiveError.UNSUPPORTED);
549 }
550 }
551 // apply filtering flow rules
552 flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
553 @Override
554 public void onSuccess(FlowRuleOperations ops) {
555 pass(filt);
556 log.info("Provisioned tables for segment router");
557 }
558
559 @Override
560 public void onError(FlowRuleOperations ops) {
561 fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
562 log.info("Failed to provision tables for segment router");
563 }
564 }));
565 }
566
567 protected void setTableMissEntries() {
568 // set all table-miss-entries
569 populateTableMissEntry(vlanTableId, true, false, false, -1);
570 populateTableMissEntry(tmacTableId, true, false, false, -1);
571 populateTableMissEntry(ipv4UnicastTableId, false, true, true,
572 aclTableId);
573 populateTableMissEntry(mplsTableId, false, true, true, aclTableId);
574 populateTableMissEntry(aclTableId, false, false, false, -1);
575 }
576
577 protected void populateTableMissEntry(int tableToAdd,
578 boolean toControllerNow,
579 boolean toControllerWrite,
580 boolean toTable, int tableToSend) {
581 TrafficSelector selector = DefaultTrafficSelector.builder().build();
582 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
583
584 if (toControllerNow) {
585 tBuilder.setOutput(PortNumber.CONTROLLER);
586 }
587
588 if (toControllerWrite) {
589 tBuilder.deferred().setOutput(PortNumber.CONTROLLER);
590 }
591
592 if (toTable) {
593 tBuilder.transition(tableToSend);
594 }
595
596 FlowRule flow = DefaultFlowRule.builder().forDevice(deviceId)
597 .withSelector(selector).withTreatment(tBuilder.build())
598 .withPriority(0).fromApp(appId).makePermanent()
599 .forTable(tableToAdd).build();
600
601 flowRuleService.applyFlowRules(flow);
602 }
603
604 private void pass(Objective obj) {
605 if (obj.context().isPresent()) {
606 obj.context().get().onSuccess(obj);
607 }
608 }
609
610 protected void fail(Objective obj, ObjectiveError error) {
611 if (obj.context().isPresent()) {
612 obj.context().get().onError(obj, error);
613 }
614 }
615
616 private class InnerGroupListener implements GroupListener {
617 @Override
618 public void event(GroupEvent event) {
619 if (event.type() == GroupEvent.Type.GROUP_ADDED) {
620 GroupKey key = event.subject().appCookie();
621
622 NextObjective obj = pendingGroups.getIfPresent(key);
623 if (obj != null) {
624 flowObjectiveStore
625 .putNextGroup(obj.id(),
626 new SegmentRoutingGroup(key));
627 pass(obj);
628 pendingGroups.invalidate(key);
629 }
630 }
631 }
632 }
633
634 private class GroupChecker implements Runnable {
635
636 @Override
637 public void run() {
638 Set<GroupKey> keys = pendingGroups
639 .asMap()
640 .keySet()
641 .stream()
642 .filter(key -> groupService.getGroup(deviceId, key) != null)
643 .collect(Collectors.toSet());
644
645 keys.stream()
646 .forEach(key -> {
647 NextObjective obj = pendingGroups
648 .getIfPresent(key);
649 if (obj == null) {
650 return;
651 }
652 pass(obj);
653 pendingGroups.invalidate(key);
654 flowObjectiveStore.putNextGroup(obj.id(),
655 new SegmentRoutingGroup(
656 key));
657 });
658 }
659 }
660
661 private class SegmentRoutingGroup implements NextGroup {
662
663 private final GroupKey key;
664
665 public SegmentRoutingGroup(GroupKey key) {
666 this.key = key;
667 }
668
669 public GroupKey key() {
670 return key;
671 }
672
673 @Override
674 public byte[] data() {
675 return appKryo.serialize(key);
676 }
677
678 }
679}