blob: 8ae04d2eef4a2249f303c2ac5cfe9e71e71f90ef [file] [log] [blame]
Michele Santuari9a8d16d2016-03-24 10:37:58 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.drivers.corsa;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Michele Santuarid2c8b152016-03-30 17:57:56 -070023import com.google.common.collect.ImmutableSet;
Michele Santuari9a8d16d2016-03-24 10:37:58 -070024import org.onlab.osgi.ServiceDirectory;
25import org.onlab.packet.Ethernet;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.behaviour.NextGroup;
31import org.onosproject.net.behaviour.Pipeliner;
32import org.onosproject.net.behaviour.PipelinerContext;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.driver.AbstractHandlerBehaviour;
35import org.onosproject.net.flow.DefaultFlowRule;
36import org.onosproject.net.flow.DefaultTrafficSelector;
37import org.onosproject.net.flow.DefaultTrafficTreatment;
38import org.onosproject.net.flow.FlowRule;
39import org.onosproject.net.flow.FlowRuleOperations;
40import org.onosproject.net.flow.FlowRuleOperationsContext;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.TrafficSelector;
43import org.onosproject.net.flow.TrafficTreatment;
44import org.onosproject.net.flow.criteria.Criteria;
45import org.onosproject.net.flow.criteria.Criterion;
46import org.onosproject.net.flow.criteria.EthCriterion;
47import org.onosproject.net.flow.criteria.EthTypeCriterion;
48import org.onosproject.net.flow.criteria.IPCriterion;
49import org.onosproject.net.flow.criteria.PortCriterion;
50import org.onosproject.net.flow.criteria.VlanIdCriterion;
51import org.onosproject.net.flowobjective.FilteringObjective;
52import org.onosproject.net.flowobjective.FlowObjectiveStore;
53import org.onosproject.net.flowobjective.ForwardingObjective;
54import org.onosproject.net.flowobjective.NextObjective;
55import org.onosproject.net.flowobjective.Objective;
56import org.onosproject.net.flowobjective.ObjectiveError;
57import org.onosproject.net.group.DefaultGroupBucket;
58import org.onosproject.net.group.DefaultGroupDescription;
59import org.onosproject.net.group.DefaultGroupKey;
60import org.onosproject.net.group.Group;
61import org.onosproject.net.group.GroupBucket;
62import org.onosproject.net.group.GroupBuckets;
63import org.onosproject.net.group.GroupDescription;
64import org.onosproject.net.group.GroupEvent;
65import org.onosproject.net.group.GroupKey;
66import org.onosproject.net.group.GroupListener;
67import org.onosproject.net.group.GroupService;
68import org.onosproject.net.meter.MeterService;
69import org.slf4j.Logger;
70
71import java.util.Collection;
72import java.util.Collections;
73import java.util.List;
74import java.util.Objects;
75import java.util.Set;
76import java.util.concurrent.Executors;
77import java.util.concurrent.ScheduledExecutorService;
78import java.util.concurrent.TimeUnit;
79import java.util.stream.Collectors;
80
81import static org.onlab.util.Tools.groupedThreads;
82import static org.onosproject.net.flow.FlowRule.Builder;
83import static org.slf4j.LoggerFactory.getLogger;
84
85/**
86 * Abstraction of the Corsa pipeline handler.
87 */
88public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour implements Pipeliner {
89
90
91 private final Logger log = getLogger(getClass());
92
93 private ServiceDirectory serviceDirectory;
94 protected FlowRuleService flowRuleService;
95 private CoreService coreService;
96 private GroupService groupService;
97 protected MeterService meterService;
98 private FlowObjectiveStore flowObjectiveStore;
99 protected DeviceId deviceId;
100 protected ApplicationId appId;
101 protected DeviceService deviceService;
102
103 private KryoNamespace appKryo = new KryoNamespace.Builder()
104 .register(GroupKey.class)
105 .register(DefaultGroupKey.class)
106 .register(CorsaGroup.class)
107 .register(byte[].class)
108 .build();
109
110 private Cache<GroupKey, NextObjective> pendingGroups;
111
112 private ScheduledExecutorService groupChecker =
113 Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
114 "ovs-corsa-%d"));
115
116 protected static final int CONTROLLER_PRIORITY = 255;
117 protected static final int DROP_PRIORITY = 0;
118 protected static final int HIGHEST_PRIORITY = 0xffff;
119 protected static final String APPID = "org.onosproject.drivers.corsa.CorsaPipeline";
120
121 @Override
122 public void init(DeviceId deviceId, PipelinerContext context) {
123 this.serviceDirectory = context.directory();
124 this.deviceId = deviceId;
125
126 pendingGroups = CacheBuilder.newBuilder()
127 .expireAfterWrite(20, TimeUnit.SECONDS)
128 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
129 if (notification.getCause() == RemovalCause.EXPIRED) {
130 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
131 }
132 }).build();
133
134 groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
135
136 coreService = serviceDirectory.get(CoreService.class);
137 flowRuleService = serviceDirectory.get(FlowRuleService.class);
138 groupService = serviceDirectory.get(GroupService.class);
139 meterService = serviceDirectory.get(MeterService.class);
140 deviceService = serviceDirectory.get(DeviceService.class);
141 flowObjectiveStore = context.store();
142
143 groupService.addListener(new InnerGroupListener());
144
145 appId = coreService.registerApplication(APPID);
146
147 initializePipeline();
148 }
149
150 protected abstract void initializePipeline();
151
152 protected void pass(Objective obj) {
153 obj.context().ifPresent(context -> context.onSuccess(obj));
154 }
155
156 protected void fail(Objective obj, ObjectiveError error) {
157 obj.context().ifPresent(context -> context.onError(obj, error));
158 }
159
160 private class GroupChecker implements Runnable {
161
162 @Override
163 public void run() {
164 Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
165 .filter(key -> groupService.getGroup(deviceId, key) != null)
166 .collect(Collectors.toSet());
167
168 keys.stream().forEach(key -> {
169 NextObjective obj = pendingGroups.getIfPresent(key);
170 if (obj == null) {
171 return;
172 }
173 pass(obj);
174 pendingGroups.invalidate(key);
175 log.info("Heard back from group service for group {}. "
176 + "Applying pending forwarding objectives", obj.id());
177 flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
178 });
179 }
180 }
181
182 private class CorsaGroup implements NextGroup {
183
184 private final GroupKey key;
185
186 public CorsaGroup(GroupKey key) {
187 this.key = key;
188 }
189
190 public GroupKey key() {
191 return key;
192 }
193
194 @Override
195 public byte[] data() {
196 return appKryo.serialize(key);
197 }
198
199 }
200
201 @Override
202 public List<String> getNextMappings(NextGroup nextGroup) {
203 //TODO: to be implemented
204 return Collections.emptyList();
205 }
206
207 private class InnerGroupListener implements GroupListener {
208 @Override
209 public void event(GroupEvent event) {
210 if (event.type() == GroupEvent.Type.GROUP_ADDED) {
211 GroupKey key = event.subject().appCookie();
212
213 NextObjective obj = pendingGroups.getIfPresent(key);
214 if (obj != null) {
215 flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
216 pass(obj);
217 pendingGroups.invalidate(key);
218 }
219 }
220 }
221 }
222
223
224 @Override
225 public void filter(FilteringObjective filteringObjective) {
226 if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
227 processFilter(filteringObjective,
228 filteringObjective.op() == Objective.Operation.ADD,
229 filteringObjective.appId());
230 } else {
231 fail(filteringObjective, ObjectiveError.UNSUPPORTED);
232 }
233 }
234
235 private void processFilter(FilteringObjective filt, boolean install,
236 ApplicationId applicationId) {
237 // This driver only processes filtering criteria defined with switch
238 // ports as the key
239 PortCriterion port;
240 if (!filt.key().equals(Criteria.dummy()) &&
241 filt.key().type() == Criterion.Type.IN_PORT) {
242 port = (PortCriterion) filt.key();
243 } else {
244 log.warn("No key defined in filtering objective from app: {}. Not"
245 + "processing filtering objective", applicationId);
246 fail(filt, ObjectiveError.UNKNOWN);
247 return;
248 }
249 // convert filtering conditions for switch-intfs into flowrules
250 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
251 for (Criterion c : filt.conditions()) {
252 if (c.type() == Criterion.Type.ETH_DST) {
253 EthCriterion eth = (EthCriterion) c;
254 FlowRule.Builder rule = processEthFiler(filt, eth, port);
255 rule.forDevice(deviceId)
256 .fromApp(applicationId);
257 ops = install ? ops.add(rule.build()) : ops.remove(rule.build());
258
259 } else if (c.type() == Criterion.Type.VLAN_VID) {
260 VlanIdCriterion vlan = (VlanIdCriterion) c;
261 FlowRule.Builder rule = processVlanFiler(filt, vlan, port);
262 rule.forDevice(deviceId)
263 .fromApp(applicationId);
264 ops = install ? ops.add(rule.build()) : ops.remove(rule.build());
265
266 } else if (c.type() == Criterion.Type.IPV4_DST) {
267 IPCriterion ip = (IPCriterion) c;
268 FlowRule.Builder rule = processIpFilter(filt, ip, port);
269 rule.forDevice(deviceId)
270 .fromApp(applicationId);
271 ops = install ? ops.add(rule.build()) : ops.remove(rule.build());
272
273 } else {
274 log.warn("Driver does not currently process filtering condition"
275 + " of type: {}", c.type());
276 fail(filt, ObjectiveError.UNSUPPORTED);
277 }
278 }
279 // apply filtering flow rules
280 flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
281 @Override
282 public void onSuccess(FlowRuleOperations ops) {
283 pass(filt);
284 log.info("Applied filtering rules");
285 }
286
287 @Override
288 public void onError(FlowRuleOperations ops) {
289 fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
290 log.info("Failed to apply filtering rules");
291 }
292 }));
293 }
294
295 protected abstract Builder processEthFiler(FilteringObjective filt,
296 EthCriterion eth, PortCriterion port);
297
298 protected abstract Builder processVlanFiler(FilteringObjective filt,
299 VlanIdCriterion vlan, PortCriterion port);
300
301 protected abstract Builder processIpFilter(FilteringObjective filt,
302 IPCriterion ip, PortCriterion port);
303
304
305 @Override
306 public void forward(ForwardingObjective fwd) {
307 Collection<FlowRule> rules;
308 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
309
310 rules = processForward(fwd);
311 switch (fwd.op()) {
312 case ADD:
313 rules.stream()
314 .filter(Objects::nonNull)
315 .forEach(flowBuilder::add);
316 break;
317 case REMOVE:
318 rules.stream()
319 .filter(Objects::nonNull)
320 .forEach(flowBuilder::remove);
321 break;
322 default:
323 fail(fwd, ObjectiveError.UNKNOWN);
324 log.warn("Unknown forwarding type {}", fwd.op());
325 }
326
327 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
328 @Override
329 public void onSuccess(FlowRuleOperations ops) {
330 pass(fwd);
331 }
332
333 @Override
334 public void onError(FlowRuleOperations ops) {
335 fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
336 }
337 }));
338
339 }
340
341 private Collection<FlowRule> processForward(ForwardingObjective fwd) {
342 switch (fwd.flag()) {
343 case SPECIFIC:
344 return processSpecific(fwd);
345 case VERSATILE:
346 return processVersatile(fwd);
347 default:
348 fail(fwd, ObjectiveError.UNKNOWN);
349 log.warn("Unknown forwarding flag {}", fwd.flag());
350 }
Michele Santuarid2c8b152016-03-30 17:57:56 -0700351 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700352 }
353
354 private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
355 log.debug("Processing specific forwarding objective");
356 TrafficSelector selector = fwd.selector();
357 EthTypeCriterion ethType =
358 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
359 if (ethType != null) {
360 short et = ethType.ethType().toShort();
361 if (et == Ethernet.TYPE_IPV4) {
362 return processSpecificRoute(fwd);
363 } else if (et == Ethernet.TYPE_VLAN) {
364 /* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */
365 return processSpecificSwitch(fwd);
366 }
367 }
368
369 fail(fwd, ObjectiveError.UNSUPPORTED);
Michele Santuarid2c8b152016-03-30 17:57:56 -0700370 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700371 }
372
373 protected Collection<FlowRule> processSpecificSwitch(ForwardingObjective fwd) {
374 /* Not supported by until CorsaPipelineV3 */
375 log.warn("Vlan switching not supported in ovs-corsa driver");
376 fail(fwd, ObjectiveError.UNSUPPORTED);
Michele Santuarid2c8b152016-03-30 17:57:56 -0700377 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700378 }
379
380 private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
381 log.debug("Processing vesatile forwarding objective");
382 TrafficSelector selector = fwd.selector();
383
384 EthTypeCriterion ethType =
385 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
386 if (ethType == null) {
387 log.error("Versatile forwarding objective must include ethType");
388 fail(fwd, ObjectiveError.UNKNOWN);
Michele Santuarid2c8b152016-03-30 17:57:56 -0700389 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700390 }
391 Builder rule = DefaultFlowRule.builder()
392 .forDevice(deviceId)
393 .withSelector(fwd.selector())
394 .withTreatment(fwd.treatment())
395 .withPriority(fwd.priority())
396 .fromApp(fwd.appId())
397 .makePermanent();
398 if (ethType.ethType().toShort() == Ethernet.TYPE_ARP) {
399 return processArpTraffic(fwd, rule);
400 } else if (ethType.ethType().toShort() == Ethernet.TYPE_LLDP ||
401 ethType.ethType().toShort() == Ethernet.TYPE_BSN) {
402 return processLinkDiscovery(fwd, rule);
403 } else if (ethType.ethType().toShort() == Ethernet.TYPE_IPV4) {
404 return processIpTraffic(fwd, rule);
405 }
406 log.warn("Driver does not support given versatile forwarding objective");
407 fail(fwd, ObjectiveError.UNSUPPORTED);
Michele Santuarid2c8b152016-03-30 17:57:56 -0700408 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700409 }
410
411 protected abstract Collection<FlowRule> processArpTraffic(ForwardingObjective fwd, Builder rule);
412
413 protected abstract Collection<FlowRule> processLinkDiscovery(ForwardingObjective fwd, Builder rule);
414
415 protected abstract Collection<FlowRule> processIpTraffic(ForwardingObjective fwd, Builder rule);
416
417 private Collection<FlowRule> processSpecificRoute(ForwardingObjective fwd) {
418 TrafficSelector filteredSelector =
419 DefaultTrafficSelector.builder()
420 .matchEthType(Ethernet.TYPE_IPV4)
421 .matchIPDst(
422 ((IPCriterion) fwd.selector().getCriterion(Criterion.Type.IPV4_DST)).ip())
423 .build();
424
425 TrafficTreatment.Builder tb = processSpecificRoutingTreatment();
426
427 if (fwd.nextId() != null) {
428 NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
429 GroupKey key = appKryo.deserialize(next.data());
430 Group group = groupService.getGroup(deviceId, key);
431 if (group == null) {
432 log.warn("The group left!");
433 fail(fwd, ObjectiveError.GROUPMISSING);
Michele Santuarid2c8b152016-03-30 17:57:56 -0700434 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700435 }
436 tb.group(group.id());
Michele Santuarid2c8b152016-03-30 17:57:56 -0700437 } else {
438 log.error("Missing NextObjective ID for ForwardingObjective {}", fwd.id());
439 fail(fwd, ObjectiveError.BADPARAMS);
440 return ImmutableSet.of();
Michele Santuari9a8d16d2016-03-24 10:37:58 -0700441 }
442 Builder ruleBuilder = DefaultFlowRule.builder()
443 .fromApp(fwd.appId())
444 .withPriority(fwd.priority())
445 .forDevice(deviceId)
446 .withSelector(filteredSelector)
447 .withTreatment(tb.build());
448
449 ruleBuilder = processSpecificRoutingRule(ruleBuilder);
450
451 if (fwd.permanent()) {
452 ruleBuilder.makePermanent();
453 } else {
454 ruleBuilder.makeTemporary(fwd.timeout());
455 }
456 return Collections.singletonList(ruleBuilder.build());
457 }
458
459 //Hook for modifying Route traffic treatment
460 protected TrafficTreatment.Builder processSpecificRoutingTreatment() {
461 return DefaultTrafficTreatment.builder();
462 }
463
464 //Hook for modifying Route flow rule
465 protected abstract Builder processSpecificRoutingRule(Builder rb);
466
467 @Override
468 public void next(NextObjective nextObjective) {
469 switch (nextObjective.type()) {
470 case SIMPLE:
471 Collection<TrafficTreatment> treatments = nextObjective.next();
472 if (treatments.size() == 1) {
473 TrafficTreatment treatment = treatments.iterator().next();
474 treatment = processNextTreatment(treatment);
475 GroupBucket bucket =
476 DefaultGroupBucket.createIndirectGroupBucket(treatment);
477 final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
478 GroupDescription groupDescription
479 = new DefaultGroupDescription(deviceId,
480 GroupDescription.Type.INDIRECT,
481 new GroupBuckets(Collections
482 .singletonList(bucket)),
483 key,
484 null, // let group service determine group id
485 nextObjective.appId());
486 groupService.addGroup(groupDescription);
487 pendingGroups.put(key, nextObjective);
488 }
489 break;
490 case HASHED:
491 case BROADCAST:
492 case FAILOVER:
493 fail(nextObjective, ObjectiveError.UNSUPPORTED);
494 log.warn("Unsupported next objective type {}", nextObjective.type());
495 break;
496 default:
497 fail(nextObjective, ObjectiveError.UNKNOWN);
498 log.warn("Unknown next objective type {}", nextObjective.type());
499 }
500
501 }
502
503 //Hook for altering the NextObjective treatment
504 protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
505 return treatment;
506 }
507
508 //Init helper: Table Miss = Drop
509 protected void processTableMissDrop(boolean install, int table, String description) {
510 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
511
512 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
513 treatment.drop();
514
515 FlowRule rule = DefaultFlowRule.builder()
516 .forDevice(deviceId)
517 .withSelector(selector.build())
518 .withTreatment(treatment.build())
519 .withPriority(DROP_PRIORITY)
520 .fromApp(appId)
521 .makePermanent()
522 .forTable(table).build();
523
524 processFlowRule(install, rule, description);
525 }
526
527 //Init helper: Table Miss = GoTo
528 protected void processTableMissGoTo(boolean install, int table, int goTo, String description) {
529 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
530
531 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
532 treatment.transition(goTo);
533
534 FlowRule rule = DefaultFlowRule.builder()
535 .forDevice(deviceId)
536 .withSelector(selector.build())
537 .withTreatment(treatment.build())
538 .withPriority(DROP_PRIORITY)
539 .fromApp(appId)
540 .makePermanent()
541 .forTable(table).build();
542
543 processFlowRule(install, rule, description);
544 }
545
546 //Init helper: Apply flow rule
547 protected void processFlowRule(boolean install, FlowRule rule, String description) {
548 FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
549 ops = install ? ops.add(rule) : ops.remove(rule);
550
551 flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
552 @Override
553 public void onSuccess(FlowRuleOperations ops) {
554 log.info(description + " success: " + ops.toString() + ", " + rule.toString());
555 }
556
557 @Override
558 public void onError(FlowRuleOperations ops) {
559 log.info(description + " error: " + ops.toString() + ", " + rule.toString());
560 }
561 }));
562 }
563}