blob: a7be6a1da701e4c25f252cc60655b1cd40bb9dd1 [file] [log] [blame]
Saurav Dase3274c82015-05-24 17:21:56 -07001package org.onosproject.driver.pipeline;
2
3
4import com.google.common.cache.Cache;
5import com.google.common.cache.CacheBuilder;
6import com.google.common.cache.RemovalCause;
7import com.google.common.cache.RemovalNotification;
8
9import org.onlab.osgi.ServiceDirectory;
10import org.onlab.packet.Ethernet;
11import org.onlab.packet.IPv4;
12import org.onlab.packet.VlanId;
13import org.onlab.util.KryoNamespace;
14import org.onosproject.core.ApplicationId;
15import org.onosproject.core.CoreService;
16import org.onosproject.net.DeviceId;
Saurav Das6c44a632015-05-30 22:05:22 -070017import org.onosproject.net.PortNumber;
Saurav Dase3274c82015-05-24 17:21:56 -070018import org.onosproject.net.behaviour.NextGroup;
19import org.onosproject.net.behaviour.Pipeliner;
20import org.onosproject.net.behaviour.PipelinerContext;
21import org.onosproject.net.driver.AbstractHandlerBehaviour;
22import org.onosproject.net.flow.DefaultFlowRule;
23import org.onosproject.net.flow.DefaultTrafficSelector;
24import org.onosproject.net.flow.DefaultTrafficTreatment;
25import org.onosproject.net.flow.FlowRule;
26import org.onosproject.net.flow.FlowRuleOperations;
27import org.onosproject.net.flow.FlowRuleOperationsContext;
28import org.onosproject.net.flow.FlowRuleService;
29import org.onosproject.net.flow.TrafficSelector;
30import org.onosproject.net.flow.TrafficTreatment;
31import org.onosproject.net.flow.criteria.Criterion;
32import org.onosproject.net.flow.criteria.Criteria;
33import org.onosproject.net.flow.criteria.EthCriterion;
34import org.onosproject.net.flow.criteria.PortCriterion;
35import org.onosproject.net.flow.criteria.EthTypeCriterion;
36import org.onosproject.net.flow.criteria.IPCriterion;
37import org.onosproject.net.flow.criteria.VlanIdCriterion;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.L2ModificationInstruction;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveStore;
42import org.onosproject.net.flowobjective.ForwardingObjective;
43import org.onosproject.net.flowobjective.NextObjective;
44import org.onosproject.net.flowobjective.Objective;
45import org.onosproject.net.flowobjective.ObjectiveError;
46import org.onosproject.net.group.DefaultGroupBucket;
47import org.onosproject.net.group.DefaultGroupDescription;
48import org.onosproject.net.group.DefaultGroupKey;
49import org.onosproject.net.group.Group;
50import org.onosproject.net.group.GroupBucket;
51import org.onosproject.net.group.GroupBuckets;
52import org.onosproject.net.group.GroupDescription;
53import org.onosproject.net.group.GroupEvent;
54import org.onosproject.net.group.GroupKey;
55import org.onosproject.net.group.GroupListener;
56import org.onosproject.net.group.GroupService;
57import org.slf4j.Logger;
58
59import java.util.Collection;
60import java.util.Collections;
61import java.util.Set;
62import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
65import java.util.stream.Collectors;
66
67import static org.onlab.util.Tools.groupedThreads;
68import static org.slf4j.LoggerFactory.getLogger;
69
70/**
71 * Driver for Centec's V350 switches.
72 */
73public class CentecV350Pipeline extends AbstractHandlerBehaviour implements Pipeliner {
74
75 protected static final int PORT_VLAN_TABLE = 0;
76 protected static final int FILTER_TABLE = 1;
77 // TMAC is configured in MAC Table to redirect packets to ROUTE_TABLE.
78 protected static final int MAC_TABLE = 2;
79 protected static final int ROUTE_TABLE = 3;
80
81 private static final long DEFAULT_METADATA = 100;
Saurav Das6c44a632015-05-30 22:05:22 -070082 private static final long DEFAULT_METADATA_MASK = 0xffffffffffffffffL;
Saurav Dase3274c82015-05-24 17:21:56 -070083
84 // Priority used in PORT_VLAN Table, the only priority accepted is PORT_VLAN_TABLE_PRIORITY.
85 // The packet passed PORT+VLAN check will goto FILTER Table.
86 private static final int PORT_VLAN_TABLE_PRIORITY = 0xffff;
87
88 // Priority used in Filter Table.
89 private static final int FILTER_TABLE_CONTROLLER_PRIORITY = 500;
90 // TMAC priority should be lower than controller.
91 private static final int FILTER_TABLE_TMAC_PRIORITY = 200;
92 private static final int FILTER_TABLE_HIGHEST_PRIORITY = 0xffff;
93
94 // Priority used in MAC Table.
95 // We do exact matching for DMAC+metadata, so priority is ignored and required to be set to 0xffff.
96 private static final int MAC_TABLE_PRIORITY = 0xffff;
97
98 // Priority used in Route Table.
99 // We do LPM matching in Route Table, so priority is ignored and required to be set to 0xffff.
100 private static final int ROUTE_TABLE_PRIORITY = 0xffff;
101
102 private static final short BGP_PORT = 179;
103
104 private final Logger log = getLogger(getClass());
105
106 private ServiceDirectory serviceDirectory;
107 private FlowRuleService flowRuleService;
108 private CoreService coreService;
109 private GroupService groupService;
110 private FlowObjectiveStore flowObjectiveStore;
111 private DeviceId deviceId;
112 private ApplicationId appId;
113
114 private KryoNamespace appKryo = new KryoNamespace.Builder()
115 .register(GroupKey.class)
116 .register(DefaultGroupKey.class)
117 .register(CentecV350Group.class)
118 .register(byte[].class)
119 .build();
120
121 private Cache<GroupKey, NextObjective> pendingGroups;
122
123 private ScheduledExecutorService groupChecker =
124 Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
125 "centec-V350-%d"));
126
127 @Override
128 public void init(DeviceId deviceId, PipelinerContext context) {
129 this.serviceDirectory = context.directory();
130 this.deviceId = deviceId;
131
132 pendingGroups = CacheBuilder.newBuilder()
133 .expireAfterWrite(20, TimeUnit.SECONDS)
134 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
135 if (notification.getCause() == RemovalCause.EXPIRED) {
136 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
137 }
138 }).build();
139
140 groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
141
142 coreService = serviceDirectory.get(CoreService.class);
143 flowRuleService = serviceDirectory.get(FlowRuleService.class);
144 groupService = serviceDirectory.get(GroupService.class);
145 flowObjectiveStore = context.store();
146
147 groupService.addListener(new InnerGroupListener());
148
149 appId = coreService.registerApplication(
150 "org.onosproject.driver.CentecV350Pipeline");
151
152 initializePipeline();
153 }
154
155 @Override
156 public void filter(FilteringObjective filteringObjective) {
157 if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
158 processFilter(filteringObjective,
159 filteringObjective.op() == Objective.Operation.ADD,
160 filteringObjective.appId());
161 } else {
162 fail(filteringObjective, ObjectiveError.UNSUPPORTED);
163 }
164 }
165
166 @Override
167 public void forward(ForwardingObjective fwd) {
168 Collection<FlowRule> rules;
169 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
170
171 rules = processForward(fwd);
172 switch (fwd.op()) {
173 case ADD:
174 rules.stream()
175 .filter(rule -> rule != null)
176 .forEach(flowBuilder::add);
177 break;
178 case REMOVE:
179 rules.stream()
180 .filter(rule -> rule != null)
181 .forEach(flowBuilder::remove);
182 break;
183 default:
184 fail(fwd, ObjectiveError.UNKNOWN);
185 log.warn("Unknown forwarding type {}", fwd.op());
186 }
187
188
189 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
190 @Override
191 public void onSuccess(FlowRuleOperations ops) {
192 pass(fwd);
193 }
194
195 @Override
196 public void onError(FlowRuleOperations ops) {
197 fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
198 }
199 }));
200
201 }
202
203 @Override
204 public void next(NextObjective nextObjective) {
205 switch (nextObjective.type()) {
206 case SIMPLE:
207 Collection<TrafficTreatment> treatments = nextObjective.next();
208 if (treatments.size() == 1) {
209 TrafficTreatment treatment = treatments.iterator().next();
210
211 // Since we do not support strip_vlan in PORT_VLAN table, we use mod_vlan
212 // to modify the packet to desired vlan.
213 // Note: if we use push_vlan here, the switch will add a second VLAN tag to the outgoing
214 // packet, which is not what we want.
215 TrafficTreatment.Builder treatmentWithoutPushVlan = DefaultTrafficTreatment.builder();
216 VlanId modVlanId;
217 for (Instruction ins : treatment.allInstructions()) {
218 if (ins.type() == Instruction.Type.L2MODIFICATION) {
219 L2ModificationInstruction l2ins = (L2ModificationInstruction) ins;
220 switch (l2ins.subtype()) {
221 case ETH_DST:
222 treatmentWithoutPushVlan.setEthDst(
223 ((L2ModificationInstruction.ModEtherInstruction) l2ins).mac());
224 break;
225 case ETH_SRC:
226 treatmentWithoutPushVlan.setEthSrc(
227 ((L2ModificationInstruction.ModEtherInstruction) l2ins).mac());
228 break;
229 case VLAN_ID:
230 modVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) l2ins).vlanId();
231 treatmentWithoutPushVlan.setVlanId(modVlanId);
232 break;
233 default:
234 break;
235 }
236 } else if (ins.type() == Instruction.Type.OUTPUT) {
237 //long portNum = ((Instructions.OutputInstruction) ins).port().toLong();
238 treatmentWithoutPushVlan.add(ins);
239 } else {
240 // Ignore the vlan_pcp action since it's does matter much.
241 log.warn("Driver does not handle this type of TrafficTreatment"
242 + " instruction in nextObjectives: {}", ins.type());
243 }
244 }
245
246 GroupBucket bucket =
247 DefaultGroupBucket.createIndirectGroupBucket(treatmentWithoutPushVlan.build());
248 final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
249 GroupDescription groupDescription
250 = new DefaultGroupDescription(deviceId,
251 GroupDescription.Type.INDIRECT,
252 new GroupBuckets(Collections
253 .singletonList(bucket)),
254 key,
255 null, // let group service determine group id
256 nextObjective.appId());
257 groupService.addGroup(groupDescription);
258 pendingGroups.put(key, nextObjective);
259 }
260 break;
261 case HASHED:
262 case BROADCAST:
263 case FAILOVER:
264 fail(nextObjective, ObjectiveError.UNSUPPORTED);
265 log.warn("Unsupported next objective type {}", nextObjective.type());
266 break;
267 default:
268 fail(nextObjective, ObjectiveError.UNKNOWN);
269 log.warn("Unknown next objective type {}", nextObjective.type());
270 }
271
272 }
273
274 private Collection<FlowRule> processForward(ForwardingObjective fwd) {
275 switch (fwd.flag()) {
276 case SPECIFIC:
277 return processSpecific(fwd);
278 case VERSATILE:
279 return processVersatile(fwd);
280 default:
281 fail(fwd, ObjectiveError.UNKNOWN);
282 log.warn("Unknown forwarding flag {}", fwd.flag());
283 }
284 return Collections.emptySet();
285 }
286
287 private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
288 log.warn("Driver does not support versatile forwarding objective");
289 fail(fwd, ObjectiveError.UNSUPPORTED);
290 return Collections.emptySet();
291 }
292
293 private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
294 log.debug("Processing specific forwarding objective");
295 TrafficSelector selector = fwd.selector();
296 EthTypeCriterion ethType =
297 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
298 if (ethType == null || ethType.ethType() != Ethernet.TYPE_IPV4) {
299 fail(fwd, ObjectiveError.UNSUPPORTED);
300 return Collections.emptySet();
301 }
302
303 // Must have metadata as key.
304 TrafficSelector filteredSelector =
305 DefaultTrafficSelector.builder()
306 .matchEthType(Ethernet.TYPE_IPV4)
307 .matchMetadata(DEFAULT_METADATA)
308 .matchIPDst(
309 ((IPCriterion)
310 selector.getCriterion(Criterion.Type.IPV4_DST)).ip())
311 .build();
312
313 TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
314
315 if (fwd.nextId() != null) {
316 NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
317 GroupKey key = appKryo.deserialize(next.data());
318 Group group = groupService.getGroup(deviceId, key);
319 if (group == null) {
320 log.warn("The group left!");
321 fail(fwd, ObjectiveError.GROUPMISSING);
322 return Collections.emptySet();
323 }
324 tb.group(group.id());
325 }
326
327 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
328 .fromApp(fwd.appId())
329 .withPriority(ROUTE_TABLE_PRIORITY)
330 .forDevice(deviceId)
331 .withSelector(filteredSelector)
332 .withTreatment(tb.build());
333
334 if (fwd.permanent()) {
335 ruleBuilder.makePermanent();
336 } else {
337 ruleBuilder.makeTemporary(fwd.timeout());
338 }
339
340 ruleBuilder.forTable(ROUTE_TABLE);
341
342 return Collections.singletonList(ruleBuilder.build());
343
344 }
345
346 private void processFilter(FilteringObjective filt, boolean install,
347 ApplicationId applicationId) {
348 PortCriterion p;
349 if (!filt.key().equals(Criteria.dummy()) &&
350 filt.key().type() == Criterion.Type.IN_PORT) {
351 p = (PortCriterion) filt.key();
352 } else {
353 log.warn("No key defined in filtering objective from app: {}. Not"
354 + "processing filtering objective", applicationId);
355 fail(filt, ObjectiveError.UNKNOWN);
356 return;
357 }
358
359 // Convert filtering conditions for switch-intfs into flow rules.
360 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
361
362 for (Criterion c : filt.conditions()) {
363 // Here we do a trick to install 2 flow rules to MAC_TABLE and ROUTE_TABLE.
364 if (c.type() == Criterion.Type.ETH_DST) {
365 EthCriterion e = (EthCriterion) c;
366
367 // Install TMAC flow rule.
368 log.debug("adding rule for Termination MAC in Filter Table: {}", e.mac());
369 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
370 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
371 selector.matchEthDst(e.mac());
372 // Add IPv4 matching explicitly since we will redirect it to ROUTE Table
373 // through MAC table.
374 selector.matchEthType(Ethernet.TYPE_IPV4);
375 treatment.transition(MAC_TABLE);
376 FlowRule rule = DefaultFlowRule.builder()
377 .forDevice(deviceId)
378 .withSelector(selector.build())
379 .withTreatment(treatment.build())
380 .withPriority(FILTER_TABLE_TMAC_PRIORITY)
381 .fromApp(applicationId)
382 .makePermanent()
383 .forTable(FILTER_TABLE).build();
384 ops = install ? ops.add(rule) : ops.remove(rule);
385
386 // Must install another rule to direct the IPv4 packets that hit TMAC to
387 // Route table.
388 log.debug("adding rule for Termination MAC in MAC Table: {}", e.mac());
389 selector = DefaultTrafficSelector.builder();
390 treatment = DefaultTrafficTreatment.builder();
391 selector.matchEthDst(e.mac());
392 // MAC_Table must have metadata matching configured, use the default metadata.
393 selector.matchMetadata(DEFAULT_METADATA);
394 treatment.transition(ROUTE_TABLE);
395 rule = DefaultFlowRule.builder()
396 .forDevice(deviceId)
397 .withSelector(selector.build())
398 .withTreatment(treatment.build())
399 .withPriority(MAC_TABLE_PRIORITY)
400 .fromApp(applicationId)
401 .makePermanent()
402 .forTable(MAC_TABLE).build();
403 ops = install ? ops.add(rule) : ops.remove(rule);
404 } else if (c.type() == Criterion.Type.VLAN_VID) {
405 VlanIdCriterion v = (VlanIdCriterion) c;
406 log.debug("adding rule for VLAN: {}", v.vlanId());
407 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
408 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
409 selector.matchVlanId(v.vlanId());
410 selector.matchInPort(p.port());
411 // Although the accepted packets will be sent to filter table, we must
412 // explicitly set goto_table instruction here.
Saurav Das86af8f12015-05-25 23:55:33 -0700413 treatment.writeMetadata(DEFAULT_METADATA, DEFAULT_METADATA_MASK);
414 // set default metadata written by PORT_VLAN Table.
Saurav Dase3274c82015-05-24 17:21:56 -0700415 treatment.transition(FILTER_TABLE);
416 // We do not support strip vlan here, treatment.deferred().popVlan();
Saurav Dase3274c82015-05-24 17:21:56 -0700417 // PORT_VLAN table only accept 0xffff priority since it does exact match only.
418 FlowRule rule = DefaultFlowRule.builder()
419 .forDevice(deviceId)
420 .withSelector(selector.build())
421 .withTreatment(treatment.build())
422 .withPriority(PORT_VLAN_TABLE_PRIORITY)
423 .fromApp(applicationId)
424 .makePermanent()
425 .forTable(PORT_VLAN_TABLE).build();
426 ops = install ? ops.add(rule) : ops.remove(rule);
427 } else if (c.type() == Criterion.Type.IPV4_DST) {
Saurav Das6c44a632015-05-30 22:05:22 -0700428 IPCriterion ipaddr = (IPCriterion) c;
429 log.debug("adding IP filtering rules in FILTER table: {}", ipaddr.ip());
430 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
431 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
432 selector.matchEthType(Ethernet.TYPE_IPV4);
433 selector.matchIPDst(ipaddr.ip()); // router IPs to the controller
434 treatment.setOutput(PortNumber.CONTROLLER);
435 FlowRule rule = DefaultFlowRule.builder()
436 .forDevice(deviceId)
437 .withSelector(selector.build())
438 .withTreatment(treatment.build())
439 .withPriority(FILTER_TABLE_CONTROLLER_PRIORITY)
440 .fromApp(applicationId)
441 .makePermanent()
442 .forTable(FILTER_TABLE).build();
443 ops = install ? ops.add(rule) : ops.remove(rule);
Saurav Dase3274c82015-05-24 17:21:56 -0700444 } else {
445 log.warn("Driver does not currently process filtering condition"
446 + " of type: {}", c.type());
447 fail(filt, ObjectiveError.UNSUPPORTED);
448 }
449 }
450
451 // apply filtering flow rules
452 flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
453 @Override
454 public void onSuccess(FlowRuleOperations ops) {
455 pass(filt);
456 log.info("Applied filtering rules");
457 }
458
459 @Override
460 public void onError(FlowRuleOperations ops) {
461 fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
462 log.info("Failed to apply filtering rules");
463 }
464 }));
465 }
466
467 private void pass(Objective obj) {
468 if (obj.context().isPresent()) {
469 obj.context().get().onSuccess(obj);
470 }
471 }
472
473 private void fail(Objective obj, ObjectiveError error) {
474 if (obj.context().isPresent()) {
475 obj.context().get().onError(obj, error);
476 }
477 }
478
479 private void initializePipeline() {
480 // CENTEC_V350: PORT_VLAN_TABLE->FILTER_TABLE->MAC_TABLE(TMAC)->ROUTE_TABLE.
481 processPortVlanTable(true);
482 processFilterTable(true);
483 }
484
485 private void processPortVlanTable(boolean install) {
486 // By default the packet are dropped, need install port+vlan by some ways.
487
488 // XXX can we add table-miss-entry to drop? Code says drops by default
489 // XXX TTP description says default goes to table1.
490 // It also says that match is only on vlan -- not port-vlan -- which one is true?
491 }
492
493 private void processFilterTable(boolean install) {
494 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
495 TrafficTreatment.Builder treatment = DefaultTrafficTreatment
496 .builder();
497 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
498 FlowRule rule;
499
500 // Punt ARP packets to controller by default.
501 selector.matchEthType(Ethernet.TYPE_ARP);
502 treatment.punt();
503 rule = DefaultFlowRule.builder()
504 .forDevice(deviceId)
505 .withSelector(selector.build())
506 .withTreatment(treatment.build())
507 .withPriority(FILTER_TABLE_CONTROLLER_PRIORITY)
508 .fromApp(appId)
509 .makePermanent()
510 .forTable(FILTER_TABLE).build();
511 ops = install ? ops.add(rule) : ops.remove(rule);
512
513 // Punt BGP packets to controller directly.
514 selector = DefaultTrafficSelector.builder();
515 treatment = DefaultTrafficTreatment.builder();
516 selector.matchEthType(Ethernet.TYPE_IPV4)
517 .matchIPProtocol(IPv4.PROTOCOL_TCP)
518 .matchTcpSrc(BGP_PORT);
519 treatment.punt();
520 rule = DefaultFlowRule.builder()
521 .forDevice(deviceId)
522 .withPriority(FILTER_TABLE_HIGHEST_PRIORITY)
523 .withSelector(selector.build())
524 .withTreatment(treatment.build())
525 .fromApp(appId)
526 .makePermanent()
527 .forTable(FILTER_TABLE).build();
528 ops = install ? ops.add(rule) : ops.remove(rule);
529
530 selector = DefaultTrafficSelector.builder();
531 treatment = DefaultTrafficTreatment.builder();
532 selector.matchEthType(Ethernet.TYPE_IPV4)
533 .matchIPProtocol(IPv4.PROTOCOL_TCP)
534 .matchTcpDst(BGP_PORT);
535 treatment.punt();
536 rule = DefaultFlowRule.builder()
537 .forDevice(deviceId)
538 .withPriority(FILTER_TABLE_HIGHEST_PRIORITY)
539 .withSelector(selector.build())
540 .withTreatment(treatment.build())
541 .fromApp(appId)
542 .makePermanent()
543 .forTable(FILTER_TABLE).build();
544
545 ops = install ? ops.add(rule) : ops.remove(rule);
546
547 // Packet will be discard in PORT_VLAN table, no need to install rule in
548 // filter table.
549
550 // XXX does not tell me if packets are going to be dropped by default in
551 // filter table or not? TTP says it will be dropped by default
552
553 flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
554 @Override
555 public void onSuccess(FlowRuleOperations ops) {
556 log.info("Provisioned filter table");
557 }
558
559 @Override
560 public void onError(FlowRuleOperations ops) {
561 log.info("Failed to provision filter table");
562 }
563 }));
564 }
565
566 private class InnerGroupListener implements GroupListener {
567 @Override
568 public void event(GroupEvent event) {
569 if (event.type() == GroupEvent.Type.GROUP_ADDED) {
570 GroupKey key = event.subject().appCookie();
571
572 NextObjective obj = pendingGroups.getIfPresent(key);
573 if (obj != null) {
574 flowObjectiveStore.putNextGroup(obj.id(), new CentecV350Group(key));
575 pass(obj);
576 pendingGroups.invalidate(key);
577 }
578 }
579 }
580 }
581
582
583 private class GroupChecker implements Runnable {
584
585 @Override
586 public void run() {
587 Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
588 .filter(key -> groupService.getGroup(deviceId, key) != null)
589 .collect(Collectors.toSet());
590
591 keys.stream().forEach(key -> {
592 NextObjective obj = pendingGroups.getIfPresent(key);
593 if (obj == null) {
594 return;
595 }
596 pass(obj);
597 pendingGroups.invalidate(key);
598 log.info("Heard back from group service for group {}. "
599 + "Applying pending forwarding objectives", obj.id());
600 flowObjectiveStore.putNextGroup(obj.id(), new CentecV350Group(key));
601 });
602 }
603 }
604
605 private class CentecV350Group implements NextGroup {
606
607 private final GroupKey key;
608
609 public CentecV350Group(GroupKey key) {
610 this.key = key;
611 }
612
613 @SuppressWarnings("unused")
614 public GroupKey key() {
615 return key;
616 }
617
618 @Override
619 public byte[] data() {
620 return appKryo.serialize(key);
621 }
622
623 }
624}