blob: b28a34e27cfb97436fb9fc777d998475b9d54c1a [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.incubator.net.virtual.AbstractVnetService;
28import org.onosproject.incubator.net.virtual.NetworkId;
29import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
30import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.behaviour.NextGroup;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.AbstractHandlerBehaviour;
37import org.onosproject.net.flow.DefaultFlowRule;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleOperations;
42import org.onosproject.net.flow.FlowRuleOperationsContext;
43import org.onosproject.net.flow.FlowRuleService;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.flowobjective.FilteringObjective;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.FlowObjectiveStore;
49import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
50import org.onosproject.net.flowobjective.ForwardingObjective;
51import org.onosproject.net.flowobjective.NextObjective;
52import org.onosproject.net.flowobjective.Objective;
53import org.onosproject.net.flowobjective.ObjectiveError;
54import org.onosproject.net.flowobjective.ObjectiveEvent;
55import org.onosproject.net.group.DefaultGroupKey;
56import org.onosproject.net.group.GroupKey;
57import org.slf4j.Logger;
58
59import java.util.ArrayList;
60import java.util.List;
61import java.util.Map;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.TimeUnit;
66
67import static com.google.common.base.Preconditions.checkNotNull;
68import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Provides implementation of the flow objective programming service for virtual networks.
74 */
75// NOTE: This manager is designed to provide flow objective programming service
76// for virtual networks. Actually, virtual networks don't need to consider
77// the different implementation of data-path pipeline. But, the interfaces
78// and usages of flow objective service are still valuable for virtual network.
79// This manager is working as an interpreter from FlowObjective to FlowRules
80// to provide symmetric interfaces with ONOS core services.
81// The behaviours are based on DefaultSingleTablePipeline.
82
83public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
84 implements FlowObjectiveService {
85
86 public static final int INSTALL_RETRY_ATTEMPTS = 5;
87 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
88
89 private final Logger log = getLogger(getClass());
90
91 protected DeviceService deviceService;
92
93 // Note: The following dependencies are added on behalf of the pipeline
94 // driver behaviours to assure these services are available for their
95 // initialization.
96 protected FlowRuleService flowRuleService;
97
98 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
99 protected FlowObjectiveStore flowObjectiveStore;
100 private final FlowObjectiveStoreDelegate delegate;
101
102 private final PipelinerContext context = new InnerPipelineContext();
103
104 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
105 private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
106
107 // local store to track which nextObjectives were sent to which device
108 // for debugging purposes
109 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
110
111 private ExecutorService executorService;
112
113 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
114 NetworkId networkId) {
115 super(manager, networkId);
116
117 deviceService = manager.get(networkId(), DeviceService.class);
118 flowRuleService = manager.get(networkId(), FlowRuleService.class);
119
120 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
121
122 virtualFlowObjectiveStore =
123 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
124 delegate = new InternalStoreDelegate();
125 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
126 flowObjectiveStore = new StoreConvertor();
127 }
128
129 @Override
130 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
131 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
132 }
133
134 @Override
135 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
136 if (queueObjective(deviceId, forwardingObjective)) {
137 return;
138 }
139 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
140 }
141
142 @Override
143 public void next(DeviceId deviceId, NextObjective nextObjective) {
144 nextToDevice.put(nextObjective.id(), deviceId);
145 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
146 }
147
148 @Override
149 public int allocateNextId() {
150 return flowObjectiveStore.allocateNextId();
151 }
152
153 @Override
154 public void initPolicy(String policy) {
155
156 }
157
158 @Override
159 public List<String> getNextMappings() {
160 List<String> mappings = new ArrayList<>();
161 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
162 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
163 // then info on any nextObj could be retrieved from one controller instance.
164 // Right now the drivers on one instance can only fetch for next-ids that came
165 // to them.
166 // Also, we still need to send the right next-id to the right driver as potentially
167 // there can be different drivers for different devices. But on that account,
168 // no instance should be decoding for another instance's nextIds.
169
170 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
171 // get the device this next Objective was sent to
172 DeviceId deviceId = nextToDevice.get(e.getKey());
173 mappings.add("NextId " + e.getKey() + ": " +
174 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
175 if (deviceId != null) {
176 // this instance of the controller sent the nextObj to a driver
177 Pipeliner pipeliner = getDevicePipeliner(deviceId);
178 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
179 if (nextMappings != null) {
180 mappings.addAll(nextMappings);
181 }
182 }
183 }
184 return mappings;
185 }
186
187 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700188 public List<String> getPendingFlowObjectives() {
yoonseon86bebed2017-02-03 15:23:57 -0800189 List<String> pendingNexts = new ArrayList<>();
190 for (Integer nextId : pendingForwards.keySet()) {
191 Set<PendingNext> pnext = pendingForwards.get(nextId);
192 StringBuilder pend = new StringBuilder();
193 pend.append("Next Id: ").append(Integer.toString(nextId))
194 .append(" :: ");
195 for (PendingNext pn : pnext) {
196 pend.append(Integer.toString(pn.forwardingObjective().id()))
197 .append(" ");
198 }
199 pendingNexts.add(pend.toString());
200 }
201 return pendingNexts;
202 }
203
Saurav Das1547b3f2017-05-05 17:01:08 -0700204 @Override
205 public List<String> getPendingNexts() {
206 return getPendingFlowObjectives();
207 }
208
yoonseon86bebed2017-02-03 15:23:57 -0800209 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
210 if (fwd.nextId() == null ||
211 flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
212 // fast path
213 return false;
214 }
215 boolean queued = false;
216 synchronized (pendingForwards) {
217 // double check the flow objective store, because this block could run
218 // after a notification arrives
219 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
220 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
221 PendingNext next = new PendingNext(deviceId, fwd);
222 if (pending == null) {
223 return Sets.newHashSet(next);
224 } else {
225 pending.add(next);
226 return pending;
227 }
228 });
229 queued = true;
230 }
231 }
232 if (queued) {
233 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
234 fwd.id(), fwd.nextId(), deviceId);
235 }
236 return queued;
237 }
238
239 /**
240 * Task that passes the flow objective down to the driver. The task will
241 * make a few attempts to find the appropriate driver, then eventually give
242 * up and report an error if no suitable driver could be found.
243 */
244 private class ObjectiveInstaller implements Runnable {
245 private final DeviceId deviceId;
246 private final Objective objective;
247
248 private final int numAttempts;
249
250 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
251 this(deviceId, objective, 1);
252 }
253
254 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
255 this.deviceId = checkNotNull(deviceId);
256 this.objective = checkNotNull(objective);
257 this.numAttempts = checkNotNull(attemps);
258 }
259
260 @Override
261 public void run() {
262 try {
263 Pipeliner pipeliner = getDevicePipeliner(deviceId);
264
265 if (pipeliner != null) {
266 if (objective instanceof NextObjective) {
267 pipeliner.next((NextObjective) objective);
268 } else if (objective instanceof ForwardingObjective) {
269 pipeliner.forward((ForwardingObjective) objective);
270 } else {
271 pipeliner.filter((FilteringObjective) objective);
272 }
273 //Attempts to check if pipeliner is null for retry attempts
274 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
275 Thread.sleep(INSTALL_RETRY_INTERVAL);
276 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
277 } else {
278 // Otherwise we've tried a few times and failed, report an
279 // error back to the user.
280 objective.context().ifPresent(
281 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
282 }
283 //Excpetion thrown
284 } catch (Exception e) {
285 log.warn("Exception while installing flow objective", e);
286 }
287 }
288 }
289
290 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
291 @Override
292 public void notify(ObjectiveEvent event) {
293 if (event.type() == ObjectiveEvent.Type.ADD) {
294 log.debug("Received notification of obj event {}", event);
295 Set<PendingNext> pending;
296 synchronized (pendingForwards) {
297 // needs to be synchronized for queueObjective lookup
298 pending = pendingForwards.remove(event.subject());
299 }
300
301 if (pending == null) {
302 log.debug("Nothing pending for this obj event {}", event);
303 return;
304 }
305
306 log.debug("Processing {} pending forwarding objectives for nextId {}",
307 pending.size(), event.subject());
308 pending.forEach(p -> getDevicePipeliner(p.deviceId())
309 .forward(p.forwardingObjective()));
310 }
311 }
312 }
313
314 /**
315 * Retrieves (if it exists) the device pipeline behaviour from the cache.
316 * Otherwise it warms the caches and triggers the init method of the Pipeline.
317 * For virtual network, it returns OVS pipeliner.
318 *
319 * @param deviceId the id of the device associated to the pipeline
320 * @return the implementation of the Pipeliner behaviour
321 */
322 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
323 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
324 }
325
326 /**
327 * Creates and initialize {@link Pipeliner}.
328 * <p>
329 * Note: Expected to be called under per-Device lock.
330 * e.g., {@code pipeliners}' Map#compute family methods
331 *
332 * @param deviceId Device to initialize pipeliner
333 * @return {@link Pipeliner} instance or null
334 */
335 private Pipeliner initPipelineHandler(DeviceId deviceId) {
336 //FIXME: do we need a standard pipeline for virtual device?
337 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
338 pipeliner.init(deviceId, context);
339 return pipeliner;
340 }
341
342 // Processing context for initializing pipeline driver behaviours.
343 private class InnerPipelineContext implements PipelinerContext {
Saurav Das1547b3f2017-05-05 17:01:08 -0700344 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800345 public ServiceDirectory directory() {
346 return serviceDirectory;
347 }
348
Saurav Das1547b3f2017-05-05 17:01:08 -0700349 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800350 public FlowObjectiveStore store() {
351 return flowObjectiveStore;
352 }
353 }
354
355 /**
356 * Data class used to hold a pending forwarding objective that could not
357 * be processed because the associated next object was not present.
358 */
359 private class PendingNext {
360 private final DeviceId deviceId;
361 private final ForwardingObjective fwd;
362
363 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
364 this.deviceId = deviceId;
365 this.fwd = fwd;
366 }
367
368 public DeviceId deviceId() {
369 return deviceId;
370 }
371
372 public ForwardingObjective forwardingObjective() {
373 return fwd;
374 }
375
376 @Override
377 public int hashCode() {
378 return Objects.hash(deviceId, fwd);
379 }
380
381 @Override
382 public boolean equals(final Object obj) {
383 if (this == obj) {
384 return true;
385 }
386 if (!(obj instanceof PendingNext)) {
387 return false;
388 }
389 final PendingNext other = (PendingNext) obj;
390 if (this.deviceId.equals(other.deviceId) &&
391 this.fwd.equals(other.fwd)) {
392 return true;
393 }
394 return false;
395 }
396 }
397
398 /**
399 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
400 * to FlowObjectiveStore for PipelinerContext.
401 */
402 private class StoreConvertor implements FlowObjectiveStore {
403
404 @Override
405 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
406 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
407 }
408
409 @Override
410 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
411 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
412 }
413
414 @Override
415 public boolean hasDelegate() {
416 return virtualFlowObjectiveStore.hasDelegate(networkId());
417 }
418
419 @Override
420 public void putNextGroup(Integer nextId, NextGroup group) {
421 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
422 }
423
424 @Override
425 public NextGroup getNextGroup(Integer nextId) {
426 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
427 }
428
429 @Override
430 public NextGroup removeNextGroup(Integer nextId) {
431 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
432 }
433
434 @Override
435 public Map<Integer, NextGroup> getAllGroups() {
436 return virtualFlowObjectiveStore.getAllGroups(networkId());
437 }
438
439 @Override
440 public int allocateNextId() {
441 return virtualFlowObjectiveStore.allocateNextId(networkId());
442 }
443 }
444
445 /**
446 * Simple single table pipeline abstraction for virtual networks.
447 */
448 private class DefaultVirtualDevicePipeline
449 extends AbstractHandlerBehaviour implements Pipeliner {
450
451 private final Logger log = getLogger(getClass());
452
453 private DeviceId deviceId;
454
455 private Cache<Integer, NextObjective> pendingNext;
456
457 private KryoNamespace appKryo = new KryoNamespace.Builder()
458 .register(GroupKey.class)
459 .register(DefaultGroupKey.class)
460 .register(SingleGroup.class)
461 .register(byte[].class)
462 .build("DefaultVirtualDevicePipeline");
463
464 @Override
465 public void init(DeviceId deviceId, PipelinerContext context) {
466 this.deviceId = deviceId;
467
468 pendingNext = CacheBuilder.newBuilder()
469 .expireAfterWrite(20, TimeUnit.SECONDS)
470 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
471 if (notification.getCause() == RemovalCause.EXPIRED) {
472 notification.getValue().context()
473 .ifPresent(c -> c.onError(notification.getValue(),
474 ObjectiveError.FLOWINSTALLATIONFAILED));
475 }
476 }).build();
477 }
478
479 @Override
480 public void filter(FilteringObjective filter) {
481
482 TrafficTreatment.Builder actions;
483 switch (filter.type()) {
484 case PERMIT:
485 actions = (filter.meta() == null) ?
486 DefaultTrafficTreatment.builder().punt() :
487 DefaultTrafficTreatment.builder(filter.meta());
488 break;
489 case DENY:
490 actions = (filter.meta() == null) ?
491 DefaultTrafficTreatment.builder() :
492 DefaultTrafficTreatment.builder(filter.meta());
493 actions.drop();
494 break;
495 default:
496 log.warn("Unknown filter type: {}", filter.type());
497 actions = DefaultTrafficTreatment.builder().drop();
498 }
499
500 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
501
502 filter.conditions().forEach(selector::add);
503
504 if (filter.key() != null) {
505 selector.add(filter.key());
506 }
507
508 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
509 .forDevice(deviceId)
510 .withSelector(selector.build())
511 .withTreatment(actions.build())
512 .fromApp(filter.appId())
513 .withPriority(filter.priority());
514
515 if (filter.permanent()) {
516 ruleBuilder.makePermanent();
517 } else {
518 ruleBuilder.makeTemporary(filter.timeout());
519 }
520
521 installObjective(ruleBuilder, filter);
522 }
523
524 @Override
525 public void forward(ForwardingObjective fwd) {
526 TrafficSelector selector = fwd.selector();
527
528 if (fwd.treatment() != null) {
529 // Deal with SPECIFIC and VERSATILE in the same manner.
530 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
531 .forDevice(deviceId)
532 .withSelector(selector)
533 .fromApp(fwd.appId())
534 .withPriority(fwd.priority())
535 .withTreatment(fwd.treatment());
536
537 if (fwd.permanent()) {
538 ruleBuilder.makePermanent();
539 } else {
540 ruleBuilder.makeTemporary(fwd.timeout());
541 }
542 installObjective(ruleBuilder, fwd);
543
544 } else {
545 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
546 if (nextObjective != null) {
547 pendingNext.invalidate(fwd.nextId());
548 nextObjective.next().forEach(treat -> {
549 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
550 .forDevice(deviceId)
551 .withSelector(selector)
552 .fromApp(fwd.appId())
553 .withPriority(fwd.priority())
554 .withTreatment(treat);
555
556 if (fwd.permanent()) {
557 ruleBuilder.makePermanent();
558 } else {
559 ruleBuilder.makeTemporary(fwd.timeout());
560 }
561 installObjective(ruleBuilder, fwd);
562 });
563 } else {
564 fwd.context().ifPresent(c -> c.onError(fwd,
565 ObjectiveError.GROUPMISSING));
566 }
567 }
568 }
569
570 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
571 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
572 switch (objective.op()) {
573
574 case ADD:
575 flowBuilder.add(ruleBuilder.build());
576 break;
577 case REMOVE:
578 flowBuilder.remove(ruleBuilder.build());
579 break;
580 default:
581 log.warn("Unknown operation {}", objective.op());
582 }
583
584 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
585 @Override
586 public void onSuccess(FlowRuleOperations ops) {
587 objective.context().ifPresent(context -> context.onSuccess(objective));
588 }
589
590 @Override
591 public void onError(FlowRuleOperations ops) {
592 objective.context()
593 .ifPresent(context ->
594 context.onError(objective,
595 ObjectiveError.FLOWINSTALLATIONFAILED));
596 }
597 }));
598 }
599
600 @Override
601 public void next(NextObjective nextObjective) {
602
603 pendingNext.put(nextObjective.id(), nextObjective);
604 flowObjectiveStore.putNextGroup(nextObjective.id(),
605 new SingleGroup(
606 new DefaultGroupKey(
607 appKryo.serialize(nextObjective.id()))));
608 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
609 }
610
611 @Override
612 public List<String> getNextMappings(NextGroup nextGroup) {
613 // Default single table pipeline does not use nextObjectives or groups
614 return null;
615 }
616
617 private class SingleGroup implements NextGroup {
618
619 private final GroupKey key;
620
621 public SingleGroup(GroupKey key) {
622 this.key = key;
623 }
624
625 public GroupKey key() {
626 return key;
627 }
628
629 @Override
630 public byte[] data() {
631 return appKryo.serialize(key);
632 }
633 }
634 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700635
yoonseon86bebed2017-02-03 15:23:57 -0800636}