blob: 8eba5cf6196eeeecc2fae90f36d60b59a97c1d94 [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.incubator.net.virtual.AbstractVnetService;
28import org.onosproject.incubator.net.virtual.NetworkId;
29import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
30import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.behaviour.NextGroup;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.AbstractHandlerBehaviour;
37import org.onosproject.net.flow.DefaultFlowRule;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleOperations;
42import org.onosproject.net.flow.FlowRuleOperationsContext;
43import org.onosproject.net.flow.FlowRuleService;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.flowobjective.FilteringObjective;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.FlowObjectiveStore;
49import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
50import org.onosproject.net.flowobjective.ForwardingObjective;
51import org.onosproject.net.flowobjective.NextObjective;
52import org.onosproject.net.flowobjective.Objective;
53import org.onosproject.net.flowobjective.ObjectiveError;
54import org.onosproject.net.flowobjective.ObjectiveEvent;
55import org.onosproject.net.group.DefaultGroupKey;
56import org.onosproject.net.group.GroupKey;
57import org.slf4j.Logger;
58
59import java.util.ArrayList;
60import java.util.List;
61import java.util.Map;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.TimeUnit;
66
67import static com.google.common.base.Preconditions.checkNotNull;
68import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Provides implementation of the flow objective programming service for virtual networks.
74 */
75// NOTE: This manager is designed to provide flow objective programming service
76// for virtual networks. Actually, virtual networks don't need to consider
77// the different implementation of data-path pipeline. But, the interfaces
78// and usages of flow objective service are still valuable for virtual network.
79// This manager is working as an interpreter from FlowObjective to FlowRules
80// to provide symmetric interfaces with ONOS core services.
81// The behaviours are based on DefaultSingleTablePipeline.
82
83public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
84 implements FlowObjectiveService {
85
86 public static final int INSTALL_RETRY_ATTEMPTS = 5;
87 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
88
89 private final Logger log = getLogger(getClass());
90
91 protected DeviceService deviceService;
92
93 // Note: The following dependencies are added on behalf of the pipeline
94 // driver behaviours to assure these services are available for their
95 // initialization.
96 protected FlowRuleService flowRuleService;
97
98 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
99 protected FlowObjectiveStore flowObjectiveStore;
100 private final FlowObjectiveStoreDelegate delegate;
101
102 private final PipelinerContext context = new InnerPipelineContext();
103
104 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
105 private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
106
107 // local store to track which nextObjectives were sent to which device
108 // for debugging purposes
109 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
110
111 private ExecutorService executorService;
112
113 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
114 NetworkId networkId) {
115 super(manager, networkId);
116
117 deviceService = manager.get(networkId(), DeviceService.class);
118 flowRuleService = manager.get(networkId(), FlowRuleService.class);
119
120 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
121
122 virtualFlowObjectiveStore =
123 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
124 delegate = new InternalStoreDelegate();
125 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
126 flowObjectiveStore = new StoreConvertor();
127 }
128
129 @Override
130 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
131 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
132 }
133
134 @Override
135 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
136 if (queueObjective(deviceId, forwardingObjective)) {
137 return;
138 }
139 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
140 }
141
142 @Override
143 public void next(DeviceId deviceId, NextObjective nextObjective) {
144 nextToDevice.put(nextObjective.id(), deviceId);
145 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
146 }
147
148 @Override
149 public int allocateNextId() {
150 return flowObjectiveStore.allocateNextId();
151 }
152
153 @Override
154 public void initPolicy(String policy) {
155
156 }
157
158 @Override
159 public List<String> getNextMappings() {
160 List<String> mappings = new ArrayList<>();
161 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
162 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
163 // then info on any nextObj could be retrieved from one controller instance.
164 // Right now the drivers on one instance can only fetch for next-ids that came
165 // to them.
166 // Also, we still need to send the right next-id to the right driver as potentially
167 // there can be different drivers for different devices. But on that account,
168 // no instance should be decoding for another instance's nextIds.
169
170 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
171 // get the device this next Objective was sent to
172 DeviceId deviceId = nextToDevice.get(e.getKey());
173 mappings.add("NextId " + e.getKey() + ": " +
174 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
175 if (deviceId != null) {
176 // this instance of the controller sent the nextObj to a driver
177 Pipeliner pipeliner = getDevicePipeliner(deviceId);
178 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
179 if (nextMappings != null) {
180 mappings.addAll(nextMappings);
181 }
182 }
183 }
184 return mappings;
185 }
186
187 @Override
188 public List<String> getPendingNexts() {
189 List<String> pendingNexts = new ArrayList<>();
190 for (Integer nextId : pendingForwards.keySet()) {
191 Set<PendingNext> pnext = pendingForwards.get(nextId);
192 StringBuilder pend = new StringBuilder();
193 pend.append("Next Id: ").append(Integer.toString(nextId))
194 .append(" :: ");
195 for (PendingNext pn : pnext) {
196 pend.append(Integer.toString(pn.forwardingObjective().id()))
197 .append(" ");
198 }
199 pendingNexts.add(pend.toString());
200 }
201 return pendingNexts;
202 }
203
204 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
205 if (fwd.nextId() == null ||
206 flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
207 // fast path
208 return false;
209 }
210 boolean queued = false;
211 synchronized (pendingForwards) {
212 // double check the flow objective store, because this block could run
213 // after a notification arrives
214 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
215 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
216 PendingNext next = new PendingNext(deviceId, fwd);
217 if (pending == null) {
218 return Sets.newHashSet(next);
219 } else {
220 pending.add(next);
221 return pending;
222 }
223 });
224 queued = true;
225 }
226 }
227 if (queued) {
228 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
229 fwd.id(), fwd.nextId(), deviceId);
230 }
231 return queued;
232 }
233
234 /**
235 * Task that passes the flow objective down to the driver. The task will
236 * make a few attempts to find the appropriate driver, then eventually give
237 * up and report an error if no suitable driver could be found.
238 */
239 private class ObjectiveInstaller implements Runnable {
240 private final DeviceId deviceId;
241 private final Objective objective;
242
243 private final int numAttempts;
244
245 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
246 this(deviceId, objective, 1);
247 }
248
249 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
250 this.deviceId = checkNotNull(deviceId);
251 this.objective = checkNotNull(objective);
252 this.numAttempts = checkNotNull(attemps);
253 }
254
255 @Override
256 public void run() {
257 try {
258 Pipeliner pipeliner = getDevicePipeliner(deviceId);
259
260 if (pipeliner != null) {
261 if (objective instanceof NextObjective) {
262 pipeliner.next((NextObjective) objective);
263 } else if (objective instanceof ForwardingObjective) {
264 pipeliner.forward((ForwardingObjective) objective);
265 } else {
266 pipeliner.filter((FilteringObjective) objective);
267 }
268 //Attempts to check if pipeliner is null for retry attempts
269 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
270 Thread.sleep(INSTALL_RETRY_INTERVAL);
271 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
272 } else {
273 // Otherwise we've tried a few times and failed, report an
274 // error back to the user.
275 objective.context().ifPresent(
276 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
277 }
278 //Excpetion thrown
279 } catch (Exception e) {
280 log.warn("Exception while installing flow objective", e);
281 }
282 }
283 }
284
285 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
286 @Override
287 public void notify(ObjectiveEvent event) {
288 if (event.type() == ObjectiveEvent.Type.ADD) {
289 log.debug("Received notification of obj event {}", event);
290 Set<PendingNext> pending;
291 synchronized (pendingForwards) {
292 // needs to be synchronized for queueObjective lookup
293 pending = pendingForwards.remove(event.subject());
294 }
295
296 if (pending == null) {
297 log.debug("Nothing pending for this obj event {}", event);
298 return;
299 }
300
301 log.debug("Processing {} pending forwarding objectives for nextId {}",
302 pending.size(), event.subject());
303 pending.forEach(p -> getDevicePipeliner(p.deviceId())
304 .forward(p.forwardingObjective()));
305 }
306 }
307 }
308
309 /**
310 * Retrieves (if it exists) the device pipeline behaviour from the cache.
311 * Otherwise it warms the caches and triggers the init method of the Pipeline.
312 * For virtual network, it returns OVS pipeliner.
313 *
314 * @param deviceId the id of the device associated to the pipeline
315 * @return the implementation of the Pipeliner behaviour
316 */
317 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
318 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
319 }
320
321 /**
322 * Creates and initialize {@link Pipeliner}.
323 * <p>
324 * Note: Expected to be called under per-Device lock.
325 * e.g., {@code pipeliners}' Map#compute family methods
326 *
327 * @param deviceId Device to initialize pipeliner
328 * @return {@link Pipeliner} instance or null
329 */
330 private Pipeliner initPipelineHandler(DeviceId deviceId) {
331 //FIXME: do we need a standard pipeline for virtual device?
332 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
333 pipeliner.init(deviceId, context);
334 return pipeliner;
335 }
336
337 // Processing context for initializing pipeline driver behaviours.
338 private class InnerPipelineContext implements PipelinerContext {
339 public ServiceDirectory directory() {
340 return serviceDirectory;
341 }
342
343 public FlowObjectiveStore store() {
344 return flowObjectiveStore;
345 }
346 }
347
348 /**
349 * Data class used to hold a pending forwarding objective that could not
350 * be processed because the associated next object was not present.
351 */
352 private class PendingNext {
353 private final DeviceId deviceId;
354 private final ForwardingObjective fwd;
355
356 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
357 this.deviceId = deviceId;
358 this.fwd = fwd;
359 }
360
361 public DeviceId deviceId() {
362 return deviceId;
363 }
364
365 public ForwardingObjective forwardingObjective() {
366 return fwd;
367 }
368
369 @Override
370 public int hashCode() {
371 return Objects.hash(deviceId, fwd);
372 }
373
374 @Override
375 public boolean equals(final Object obj) {
376 if (this == obj) {
377 return true;
378 }
379 if (!(obj instanceof PendingNext)) {
380 return false;
381 }
382 final PendingNext other = (PendingNext) obj;
383 if (this.deviceId.equals(other.deviceId) &&
384 this.fwd.equals(other.fwd)) {
385 return true;
386 }
387 return false;
388 }
389 }
390
391 /**
392 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
393 * to FlowObjectiveStore for PipelinerContext.
394 */
395 private class StoreConvertor implements FlowObjectiveStore {
396
397 @Override
398 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
399 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
400 }
401
402 @Override
403 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
404 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
405 }
406
407 @Override
408 public boolean hasDelegate() {
409 return virtualFlowObjectiveStore.hasDelegate(networkId());
410 }
411
412 @Override
413 public void putNextGroup(Integer nextId, NextGroup group) {
414 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
415 }
416
417 @Override
418 public NextGroup getNextGroup(Integer nextId) {
419 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
420 }
421
422 @Override
423 public NextGroup removeNextGroup(Integer nextId) {
424 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
425 }
426
427 @Override
428 public Map<Integer, NextGroup> getAllGroups() {
429 return virtualFlowObjectiveStore.getAllGroups(networkId());
430 }
431
432 @Override
433 public int allocateNextId() {
434 return virtualFlowObjectiveStore.allocateNextId(networkId());
435 }
436 }
437
438 /**
439 * Simple single table pipeline abstraction for virtual networks.
440 */
441 private class DefaultVirtualDevicePipeline
442 extends AbstractHandlerBehaviour implements Pipeliner {
443
444 private final Logger log = getLogger(getClass());
445
446 private DeviceId deviceId;
447
448 private Cache<Integer, NextObjective> pendingNext;
449
450 private KryoNamespace appKryo = new KryoNamespace.Builder()
451 .register(GroupKey.class)
452 .register(DefaultGroupKey.class)
453 .register(SingleGroup.class)
454 .register(byte[].class)
455 .build("DefaultVirtualDevicePipeline");
456
457 @Override
458 public void init(DeviceId deviceId, PipelinerContext context) {
459 this.deviceId = deviceId;
460
461 pendingNext = CacheBuilder.newBuilder()
462 .expireAfterWrite(20, TimeUnit.SECONDS)
463 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
464 if (notification.getCause() == RemovalCause.EXPIRED) {
465 notification.getValue().context()
466 .ifPresent(c -> c.onError(notification.getValue(),
467 ObjectiveError.FLOWINSTALLATIONFAILED));
468 }
469 }).build();
470 }
471
472 @Override
473 public void filter(FilteringObjective filter) {
474
475 TrafficTreatment.Builder actions;
476 switch (filter.type()) {
477 case PERMIT:
478 actions = (filter.meta() == null) ?
479 DefaultTrafficTreatment.builder().punt() :
480 DefaultTrafficTreatment.builder(filter.meta());
481 break;
482 case DENY:
483 actions = (filter.meta() == null) ?
484 DefaultTrafficTreatment.builder() :
485 DefaultTrafficTreatment.builder(filter.meta());
486 actions.drop();
487 break;
488 default:
489 log.warn("Unknown filter type: {}", filter.type());
490 actions = DefaultTrafficTreatment.builder().drop();
491 }
492
493 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
494
495 filter.conditions().forEach(selector::add);
496
497 if (filter.key() != null) {
498 selector.add(filter.key());
499 }
500
501 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
502 .forDevice(deviceId)
503 .withSelector(selector.build())
504 .withTreatment(actions.build())
505 .fromApp(filter.appId())
506 .withPriority(filter.priority());
507
508 if (filter.permanent()) {
509 ruleBuilder.makePermanent();
510 } else {
511 ruleBuilder.makeTemporary(filter.timeout());
512 }
513
514 installObjective(ruleBuilder, filter);
515 }
516
517 @Override
518 public void forward(ForwardingObjective fwd) {
519 TrafficSelector selector = fwd.selector();
520
521 if (fwd.treatment() != null) {
522 // Deal with SPECIFIC and VERSATILE in the same manner.
523 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
524 .forDevice(deviceId)
525 .withSelector(selector)
526 .fromApp(fwd.appId())
527 .withPriority(fwd.priority())
528 .withTreatment(fwd.treatment());
529
530 if (fwd.permanent()) {
531 ruleBuilder.makePermanent();
532 } else {
533 ruleBuilder.makeTemporary(fwd.timeout());
534 }
535 installObjective(ruleBuilder, fwd);
536
537 } else {
538 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
539 if (nextObjective != null) {
540 pendingNext.invalidate(fwd.nextId());
541 nextObjective.next().forEach(treat -> {
542 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
543 .forDevice(deviceId)
544 .withSelector(selector)
545 .fromApp(fwd.appId())
546 .withPriority(fwd.priority())
547 .withTreatment(treat);
548
549 if (fwd.permanent()) {
550 ruleBuilder.makePermanent();
551 } else {
552 ruleBuilder.makeTemporary(fwd.timeout());
553 }
554 installObjective(ruleBuilder, fwd);
555 });
556 } else {
557 fwd.context().ifPresent(c -> c.onError(fwd,
558 ObjectiveError.GROUPMISSING));
559 }
560 }
561 }
562
563 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
564 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
565 switch (objective.op()) {
566
567 case ADD:
568 flowBuilder.add(ruleBuilder.build());
569 break;
570 case REMOVE:
571 flowBuilder.remove(ruleBuilder.build());
572 break;
573 default:
574 log.warn("Unknown operation {}", objective.op());
575 }
576
577 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
578 @Override
579 public void onSuccess(FlowRuleOperations ops) {
580 objective.context().ifPresent(context -> context.onSuccess(objective));
581 }
582
583 @Override
584 public void onError(FlowRuleOperations ops) {
585 objective.context()
586 .ifPresent(context ->
587 context.onError(objective,
588 ObjectiveError.FLOWINSTALLATIONFAILED));
589 }
590 }));
591 }
592
593 @Override
594 public void next(NextObjective nextObjective) {
595
596 pendingNext.put(nextObjective.id(), nextObjective);
597 flowObjectiveStore.putNextGroup(nextObjective.id(),
598 new SingleGroup(
599 new DefaultGroupKey(
600 appKryo.serialize(nextObjective.id()))));
601 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
602 }
603
604 @Override
605 public List<String> getNextMappings(NextGroup nextGroup) {
606 // Default single table pipeline does not use nextObjectives or groups
607 return null;
608 }
609
610 private class SingleGroup implements NextGroup {
611
612 private final GroupKey key;
613
614 public SingleGroup(GroupKey key) {
615 this.key = key;
616 }
617
618 public GroupKey key() {
619 return key;
620 }
621
622 @Override
623 public byte[] data() {
624 return appKryo.serialize(key);
625 }
626 }
627 }
628}