blob: 0a5a509a5d8f7e45eb52eab65676ccaa6dd31786 [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
Thomas Vachuska52f2cd12018-11-08 21:20:04 -08002 * Copyright 2018-present Open Networking Foundation
yoonseon86bebed2017-02-03 15:23:57 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Harshada Chaundkar5a198b02019-07-03 16:27:45 +000025import com.google.common.collect.ImmutableMap;
26import org.apache.commons.lang3.tuple.Pair;
yoonseon86bebed2017-02-03 15:23:57 -080027import org.onlab.osgi.ServiceDirectory;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.incubator.net.virtual.AbstractVnetService;
30import org.onosproject.incubator.net.virtual.NetworkId;
31import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
32import org.onosproject.incubator.net.virtual.VirtualNetworkService;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.driver.AbstractHandlerBehaviour;
39import org.onosproject.net.flow.DefaultFlowRule;
40import org.onosproject.net.flow.DefaultTrafficSelector;
41import org.onosproject.net.flow.DefaultTrafficTreatment;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleOperations;
44import org.onosproject.net.flow.FlowRuleOperationsContext;
45import org.onosproject.net.flow.FlowRuleService;
46import org.onosproject.net.flow.TrafficSelector;
47import org.onosproject.net.flow.TrafficTreatment;
48import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.FlowObjectiveService;
50import org.onosproject.net.flowobjective.FlowObjectiveStore;
51import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
52import org.onosproject.net.flowobjective.ForwardingObjective;
53import org.onosproject.net.flowobjective.NextObjective;
54import org.onosproject.net.flowobjective.Objective;
55import org.onosproject.net.flowobjective.ObjectiveError;
56import org.onosproject.net.flowobjective.ObjectiveEvent;
57import org.onosproject.net.group.DefaultGroupKey;
58import org.onosproject.net.group.GroupKey;
59import org.slf4j.Logger;
60
61import java.util.ArrayList;
62import java.util.List;
63import java.util.Map;
64import java.util.Objects;
65import java.util.Set;
66import java.util.concurrent.ExecutorService;
67import java.util.concurrent.TimeUnit;
68
69import static com.google.common.base.Preconditions.checkNotNull;
70import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
71import static org.onlab.util.Tools.groupedThreads;
72import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * Provides implementation of the flow objective programming service for virtual networks.
76 */
77// NOTE: This manager is designed to provide flow objective programming service
78// for virtual networks. Actually, virtual networks don't need to consider
79// the different implementation of data-path pipeline. But, the interfaces
80// and usages of flow objective service are still valuable for virtual network.
81// This manager is working as an interpreter from FlowObjective to FlowRules
82// to provide symmetric interfaces with ONOS core services.
83// The behaviours are based on DefaultSingleTablePipeline.
84
85public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
86 implements FlowObjectiveService {
87
88 public static final int INSTALL_RETRY_ATTEMPTS = 5;
89 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
90
91 private final Logger log = getLogger(getClass());
92
93 protected DeviceService deviceService;
94
95 // Note: The following dependencies are added on behalf of the pipeline
96 // driver behaviours to assure these services are available for their
97 // initialization.
98 protected FlowRuleService flowRuleService;
99
100 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
101 protected FlowObjectiveStore flowObjectiveStore;
102 private final FlowObjectiveStoreDelegate delegate;
103
104 private final PipelinerContext context = new InnerPipelineContext();
105
106 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700107
108 // local stores for queuing fwd and next objectives that are waiting for an
109 // associated next objective execution to complete. The signal for completed
110 // execution comes from a pipeline driver, in this or another controller
111 // instance, via the DistributedFlowObjectiveStore.
112 private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
113 Maps.newConcurrentMap();
114 private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
115 Maps.newConcurrentMap();
yoonseon86bebed2017-02-03 15:23:57 -0800116
117 // local store to track which nextObjectives were sent to which device
118 // for debugging purposes
119 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
120
121 private ExecutorService executorService;
122
123 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
124 NetworkId networkId) {
125 super(manager, networkId);
126
127 deviceService = manager.get(networkId(), DeviceService.class);
128 flowRuleService = manager.get(networkId(), FlowRuleService.class);
129
130 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
131
132 virtualFlowObjectiveStore =
133 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
134 delegate = new InternalStoreDelegate();
135 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
136 flowObjectiveStore = new StoreConvertor();
137 }
138
139 @Override
140 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
141 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
142 }
143
144 @Override
145 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700146 if (forwardingObjective.nextId() == null ||
147 forwardingObjective.op() == Objective.Operation.REMOVE ||
148 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
149 !queueFwdObjective(deviceId, forwardingObjective)) {
150 // fast path
151 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
yoonseon86bebed2017-02-03 15:23:57 -0800152 }
yoonseon86bebed2017-02-03 15:23:57 -0800153 }
154
155 @Override
156 public void next(DeviceId deviceId, NextObjective nextObjective) {
157 nextToDevice.put(nextObjective.id(), deviceId);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700158 if (nextObjective.op() == Objective.Operation.ADD ||
159 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
160 !queueNextObjective(deviceId, nextObjective)) {
161 // either group exists or we are trying to create it - let it through
162 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
163 }
yoonseon86bebed2017-02-03 15:23:57 -0800164 }
165
166 @Override
167 public int allocateNextId() {
168 return flowObjectiveStore.allocateNextId();
169 }
170
171 @Override
172 public void initPolicy(String policy) {
173
174 }
175
176 @Override
Harshada Chaundkar5a198b02019-07-03 16:27:45 +0000177 public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
178 return ImmutableMap.of();
179 }
180
181 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800182 public List<String> getNextMappings() {
183 List<String> mappings = new ArrayList<>();
184 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
185 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
186 // then info on any nextObj could be retrieved from one controller instance.
187 // Right now the drivers on one instance can only fetch for next-ids that came
188 // to them.
189 // Also, we still need to send the right next-id to the right driver as potentially
190 // there can be different drivers for different devices. But on that account,
191 // no instance should be decoding for another instance's nextIds.
192
193 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
194 // get the device this next Objective was sent to
195 DeviceId deviceId = nextToDevice.get(e.getKey());
196 mappings.add("NextId " + e.getKey() + ": " +
197 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
198 if (deviceId != null) {
199 // this instance of the controller sent the nextObj to a driver
200 Pipeliner pipeliner = getDevicePipeliner(deviceId);
201 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
202 if (nextMappings != null) {
203 mappings.addAll(nextMappings);
204 }
205 }
206 }
207 return mappings;
208 }
209
210 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700211 public List<String> getPendingFlowObjectives() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700212 List<String> pendingFlowObjectives = new ArrayList<>();
213
yoonseon86bebed2017-02-03 15:23:57 -0800214 for (Integer nextId : pendingForwards.keySet()) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700215 Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
yoonseon86bebed2017-02-03 15:23:57 -0800216 StringBuilder pend = new StringBuilder();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700217 pend.append("NextId: ")
218 .append(nextId);
219 for (PendingFlowObjective pf : pfwd) {
220 pend.append("\n FwdId: ")
221 .append(String.format("%11s", pf.flowObjective().id()))
222 .append(", DeviceId: ")
223 .append(pf.deviceId())
224 .append(", Selector: ")
225 .append(((ForwardingObjective) pf.flowObjective())
226 .selector().criteria());
yoonseon86bebed2017-02-03 15:23:57 -0800227 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700228 pendingFlowObjectives.add(pend.toString());
yoonseon86bebed2017-02-03 15:23:57 -0800229 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700230
231 for (Integer nextId : pendingNexts.keySet()) {
232 Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
233 StringBuilder pend = new StringBuilder();
234 pend.append("NextId: ")
235 .append(nextId);
236 for (PendingFlowObjective pn : pnext) {
237 pend.append("\n NextOp: ")
238 .append(pn.flowObjective().op())
239 .append(", DeviceId: ")
240 .append(pn.deviceId())
241 .append(", Treatments: ")
242 .append(((NextObjective) pn.flowObjective())
243 .next());
244 }
245 pendingFlowObjectives.add(pend.toString());
246 }
247
248 return pendingFlowObjectives;
yoonseon86bebed2017-02-03 15:23:57 -0800249 }
250
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700251 private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
yoonseon86bebed2017-02-03 15:23:57 -0800252 boolean queued = false;
253 synchronized (pendingForwards) {
254 // double check the flow objective store, because this block could run
255 // after a notification arrives
256 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
257 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700258 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
yoonseon86bebed2017-02-03 15:23:57 -0800259 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700260 return Sets.newHashSet(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800261 } else {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700262 pending.add(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800263 return pending;
264 }
265 });
266 queued = true;
267 }
268 }
269 if (queued) {
270 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
271 fwd.id(), fwd.nextId(), deviceId);
272 }
273 return queued;
274 }
275
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700276 private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
277
278 // we need to hold off on other operations till we get notified that the
279 // initial group creation has succeeded
280 boolean queued = false;
281 synchronized (pendingNexts) {
282 // double check the flow objective store, because this block could run
283 // after a notification arrives
284 if (flowObjectiveStore.getNextGroup(next.id()) == null) {
285 pendingNexts.compute(next.id(), (id, pending) -> {
286 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
287 if (pending == null) {
288 return Sets.newHashSet(pendfo);
289 } else {
290 pending.add(pendfo);
291 return pending;
292 }
293 });
294 queued = true;
295 }
296 }
297 if (queued) {
298 log.debug("Queued next objective {} with operation {} meant for device {}",
299 next.id(), next.op(), deviceId);
300 }
301 return queued;
302 }
303
yoonseon86bebed2017-02-03 15:23:57 -0800304 /**
305 * Task that passes the flow objective down to the driver. The task will
306 * make a few attempts to find the appropriate driver, then eventually give
307 * up and report an error if no suitable driver could be found.
308 */
309 private class ObjectiveInstaller implements Runnable {
310 private final DeviceId deviceId;
311 private final Objective objective;
312
313 private final int numAttempts;
314
315 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
316 this(deviceId, objective, 1);
317 }
318
319 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
320 this.deviceId = checkNotNull(deviceId);
321 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800322 this.numAttempts = attemps;
yoonseon86bebed2017-02-03 15:23:57 -0800323 }
324
325 @Override
326 public void run() {
327 try {
328 Pipeliner pipeliner = getDevicePipeliner(deviceId);
329
330 if (pipeliner != null) {
331 if (objective instanceof NextObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700332 nextToDevice.put(objective.id(), deviceId);
yoonseon86bebed2017-02-03 15:23:57 -0800333 pipeliner.next((NextObjective) objective);
334 } else if (objective instanceof ForwardingObjective) {
335 pipeliner.forward((ForwardingObjective) objective);
336 } else {
337 pipeliner.filter((FilteringObjective) objective);
338 }
339 //Attempts to check if pipeliner is null for retry attempts
340 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
341 Thread.sleep(INSTALL_RETRY_INTERVAL);
342 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
343 } else {
344 // Otherwise we've tried a few times and failed, report an
345 // error back to the user.
346 objective.context().ifPresent(
347 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
348 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700349 //Exception thrown
yoonseon86bebed2017-02-03 15:23:57 -0800350 } catch (Exception e) {
351 log.warn("Exception while installing flow objective", e);
352 }
353 }
354 }
355
356 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
357 @Override
358 public void notify(ObjectiveEvent event) {
359 if (event.type() == ObjectiveEvent.Type.ADD) {
360 log.debug("Received notification of obj event {}", event);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700361 Set<PendingFlowObjective> pending;
362
363 // first send all pending flows
yoonseon86bebed2017-02-03 15:23:57 -0800364 synchronized (pendingForwards) {
365 // needs to be synchronized for queueObjective lookup
366 pending = pendingForwards.remove(event.subject());
367 }
yoonseon86bebed2017-02-03 15:23:57 -0800368 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700369 log.debug("No forwarding objectives pending for this "
370 + "obj event {}", event);
371 } else {
372 log.debug("Processing {} pending forwarding objectives for nextId {}",
373 pending.size(), event.subject());
374 pending.forEach(p -> getDevicePipeliner(p.deviceId())
375 .forward((ForwardingObjective) p.flowObjective()));
yoonseon86bebed2017-02-03 15:23:57 -0800376 }
377
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700378 // now check for pending next-objectives
379 synchronized (pendingNexts) {
380 // needs to be synchronized for queueObjective lookup
381 pending = pendingNexts.remove(event.subject());
382 }
383 if (pending == null) {
384 log.debug("No next objectives pending for this "
385 + "obj event {}", event);
386 } else {
387 log.debug("Processing {} pending next objectives for nextId {}",
388 pending.size(), event.subject());
389 pending.forEach(p -> getDevicePipeliner(p.deviceId())
390 .next((NextObjective) p.flowObjective()));
391 }
yoonseon86bebed2017-02-03 15:23:57 -0800392 }
393 }
394 }
395
396 /**
397 * Retrieves (if it exists) the device pipeline behaviour from the cache.
398 * Otherwise it warms the caches and triggers the init method of the Pipeline.
399 * For virtual network, it returns OVS pipeliner.
400 *
401 * @param deviceId the id of the device associated to the pipeline
402 * @return the implementation of the Pipeliner behaviour
403 */
404 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
405 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
406 }
407
408 /**
409 * Creates and initialize {@link Pipeliner}.
410 * <p>
411 * Note: Expected to be called under per-Device lock.
412 * e.g., {@code pipeliners}' Map#compute family methods
413 *
414 * @param deviceId Device to initialize pipeliner
415 * @return {@link Pipeliner} instance or null
416 */
417 private Pipeliner initPipelineHandler(DeviceId deviceId) {
418 //FIXME: do we need a standard pipeline for virtual device?
419 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
420 pipeliner.init(deviceId, context);
421 return pipeliner;
422 }
423
424 // Processing context for initializing pipeline driver behaviours.
425 private class InnerPipelineContext implements PipelinerContext {
Saurav Das1547b3f2017-05-05 17:01:08 -0700426 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800427 public ServiceDirectory directory() {
428 return serviceDirectory;
429 }
430
Saurav Das1547b3f2017-05-05 17:01:08 -0700431 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800432 public FlowObjectiveStore store() {
433 return flowObjectiveStore;
434 }
435 }
436
437 /**
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700438 * Data class used to hold a pending flow objective that could not
yoonseon86bebed2017-02-03 15:23:57 -0800439 * be processed because the associated next object was not present.
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700440 * Note that this pending flow objective could be a forwarding objective
441 * waiting for a next objective to complete execution. Or it could a
442 * next objective (with a different operation - remove, addToExisting, or
443 * removeFromExisting) waiting for a next objective with the same id to
444 * complete execution.
yoonseon86bebed2017-02-03 15:23:57 -0800445 */
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700446 private class PendingFlowObjective {
yoonseon86bebed2017-02-03 15:23:57 -0800447 private final DeviceId deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700448 private final Objective flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800449
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700450 public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
yoonseon86bebed2017-02-03 15:23:57 -0800451 this.deviceId = deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700452 this.flowObj = flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800453 }
454
455 public DeviceId deviceId() {
456 return deviceId;
457 }
458
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700459 public Objective flowObjective() {
460 return flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800461 }
462
463 @Override
464 public int hashCode() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700465 return Objects.hash(deviceId, flowObj);
yoonseon86bebed2017-02-03 15:23:57 -0800466 }
467
468 @Override
469 public boolean equals(final Object obj) {
470 if (this == obj) {
471 return true;
472 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700473 if (!(obj instanceof PendingFlowObjective)) {
yoonseon86bebed2017-02-03 15:23:57 -0800474 return false;
475 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700476 final PendingFlowObjective other = (PendingFlowObjective) obj;
yoonseon86bebed2017-02-03 15:23:57 -0800477 if (this.deviceId.equals(other.deviceId) &&
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700478 this.flowObj.equals(other.flowObj)) {
yoonseon86bebed2017-02-03 15:23:57 -0800479 return true;
480 }
481 return false;
482 }
483 }
484
485 /**
486 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
487 * to FlowObjectiveStore for PipelinerContext.
488 */
489 private class StoreConvertor implements FlowObjectiveStore {
490
491 @Override
492 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
493 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
494 }
495
496 @Override
497 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
498 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
499 }
500
501 @Override
502 public boolean hasDelegate() {
503 return virtualFlowObjectiveStore.hasDelegate(networkId());
504 }
505
506 @Override
507 public void putNextGroup(Integer nextId, NextGroup group) {
508 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
509 }
510
511 @Override
512 public NextGroup getNextGroup(Integer nextId) {
513 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
514 }
515
516 @Override
517 public NextGroup removeNextGroup(Integer nextId) {
518 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
519 }
520
521 @Override
522 public Map<Integer, NextGroup> getAllGroups() {
523 return virtualFlowObjectiveStore.getAllGroups(networkId());
524 }
525
526 @Override
527 public int allocateNextId() {
528 return virtualFlowObjectiveStore.allocateNextId(networkId());
529 }
530 }
531
532 /**
533 * Simple single table pipeline abstraction for virtual networks.
534 */
535 private class DefaultVirtualDevicePipeline
536 extends AbstractHandlerBehaviour implements Pipeliner {
537
538 private final Logger log = getLogger(getClass());
539
540 private DeviceId deviceId;
541
542 private Cache<Integer, NextObjective> pendingNext;
543
544 private KryoNamespace appKryo = new KryoNamespace.Builder()
545 .register(GroupKey.class)
546 .register(DefaultGroupKey.class)
547 .register(SingleGroup.class)
548 .register(byte[].class)
549 .build("DefaultVirtualDevicePipeline");
550
551 @Override
552 public void init(DeviceId deviceId, PipelinerContext context) {
553 this.deviceId = deviceId;
554
555 pendingNext = CacheBuilder.newBuilder()
556 .expireAfterWrite(20, TimeUnit.SECONDS)
557 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
558 if (notification.getCause() == RemovalCause.EXPIRED) {
559 notification.getValue().context()
560 .ifPresent(c -> c.onError(notification.getValue(),
561 ObjectiveError.FLOWINSTALLATIONFAILED));
562 }
563 }).build();
564 }
565
566 @Override
567 public void filter(FilteringObjective filter) {
568
569 TrafficTreatment.Builder actions;
570 switch (filter.type()) {
571 case PERMIT:
572 actions = (filter.meta() == null) ?
573 DefaultTrafficTreatment.builder().punt() :
574 DefaultTrafficTreatment.builder(filter.meta());
575 break;
576 case DENY:
577 actions = (filter.meta() == null) ?
578 DefaultTrafficTreatment.builder() :
579 DefaultTrafficTreatment.builder(filter.meta());
580 actions.drop();
581 break;
582 default:
583 log.warn("Unknown filter type: {}", filter.type());
584 actions = DefaultTrafficTreatment.builder().drop();
585 }
586
587 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
588
589 filter.conditions().forEach(selector::add);
590
591 if (filter.key() != null) {
592 selector.add(filter.key());
593 }
594
595 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
596 .forDevice(deviceId)
597 .withSelector(selector.build())
598 .withTreatment(actions.build())
599 .fromApp(filter.appId())
600 .withPriority(filter.priority());
601
602 if (filter.permanent()) {
603 ruleBuilder.makePermanent();
604 } else {
605 ruleBuilder.makeTemporary(filter.timeout());
606 }
607
608 installObjective(ruleBuilder, filter);
609 }
610
611 @Override
612 public void forward(ForwardingObjective fwd) {
613 TrafficSelector selector = fwd.selector();
614
615 if (fwd.treatment() != null) {
616 // Deal with SPECIFIC and VERSATILE in the same manner.
617 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
618 .forDevice(deviceId)
619 .withSelector(selector)
620 .fromApp(fwd.appId())
621 .withPriority(fwd.priority())
622 .withTreatment(fwd.treatment());
623
624 if (fwd.permanent()) {
625 ruleBuilder.makePermanent();
626 } else {
627 ruleBuilder.makeTemporary(fwd.timeout());
628 }
629 installObjective(ruleBuilder, fwd);
630
631 } else {
632 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
633 if (nextObjective != null) {
634 pendingNext.invalidate(fwd.nextId());
635 nextObjective.next().forEach(treat -> {
636 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
637 .forDevice(deviceId)
638 .withSelector(selector)
639 .fromApp(fwd.appId())
640 .withPriority(fwd.priority())
641 .withTreatment(treat);
642
643 if (fwd.permanent()) {
644 ruleBuilder.makePermanent();
645 } else {
646 ruleBuilder.makeTemporary(fwd.timeout());
647 }
648 installObjective(ruleBuilder, fwd);
649 });
650 } else {
651 fwd.context().ifPresent(c -> c.onError(fwd,
652 ObjectiveError.GROUPMISSING));
653 }
654 }
655 }
656
657 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
658 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
659 switch (objective.op()) {
660
661 case ADD:
662 flowBuilder.add(ruleBuilder.build());
663 break;
664 case REMOVE:
665 flowBuilder.remove(ruleBuilder.build());
666 break;
667 default:
668 log.warn("Unknown operation {}", objective.op());
669 }
670
671 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
672 @Override
673 public void onSuccess(FlowRuleOperations ops) {
674 objective.context().ifPresent(context -> context.onSuccess(objective));
675 }
676
677 @Override
678 public void onError(FlowRuleOperations ops) {
679 objective.context()
680 .ifPresent(context ->
681 context.onError(objective,
682 ObjectiveError.FLOWINSTALLATIONFAILED));
683 }
684 }));
685 }
686
687 @Override
688 public void next(NextObjective nextObjective) {
689
690 pendingNext.put(nextObjective.id(), nextObjective);
691 flowObjectiveStore.putNextGroup(nextObjective.id(),
692 new SingleGroup(
693 new DefaultGroupKey(
694 appKryo.serialize(nextObjective.id()))));
695 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
696 }
697
698 @Override
699 public List<String> getNextMappings(NextGroup nextGroup) {
700 // Default single table pipeline does not use nextObjectives or groups
701 return null;
702 }
703
704 private class SingleGroup implements NextGroup {
705
706 private final GroupKey key;
707
708 public SingleGroup(GroupKey key) {
709 this.key = key;
710 }
711
712 public GroupKey key() {
713 return key;
714 }
715
716 @Override
717 public byte[] data() {
718 return appKryo.serialize(key);
719 }
720 }
721 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700722
yoonseon86bebed2017-02-03 15:23:57 -0800723}