blob: 2a5bc265111bea65ebf74ba845856786286565a9 [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
yoonseon86bebed2017-02-03 15:23:57 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.incubator.net.virtual.AbstractVnetService;
28import org.onosproject.incubator.net.virtual.NetworkId;
29import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
30import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.behaviour.NextGroup;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.AbstractHandlerBehaviour;
37import org.onosproject.net.flow.DefaultFlowRule;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleOperations;
42import org.onosproject.net.flow.FlowRuleOperationsContext;
43import org.onosproject.net.flow.FlowRuleService;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.flowobjective.FilteringObjective;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.FlowObjectiveStore;
49import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
50import org.onosproject.net.flowobjective.ForwardingObjective;
51import org.onosproject.net.flowobjective.NextObjective;
52import org.onosproject.net.flowobjective.Objective;
53import org.onosproject.net.flowobjective.ObjectiveError;
54import org.onosproject.net.flowobjective.ObjectiveEvent;
55import org.onosproject.net.group.DefaultGroupKey;
56import org.onosproject.net.group.GroupKey;
57import org.slf4j.Logger;
58
59import java.util.ArrayList;
60import java.util.List;
61import java.util.Map;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.TimeUnit;
66
67import static com.google.common.base.Preconditions.checkNotNull;
68import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Provides implementation of the flow objective programming service for virtual networks.
74 */
75// NOTE: This manager is designed to provide flow objective programming service
76// for virtual networks. Actually, virtual networks don't need to consider
77// the different implementation of data-path pipeline. But, the interfaces
78// and usages of flow objective service are still valuable for virtual network.
79// This manager is working as an interpreter from FlowObjective to FlowRules
80// to provide symmetric interfaces with ONOS core services.
81// The behaviours are based on DefaultSingleTablePipeline.
82
83public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
84 implements FlowObjectiveService {
85
86 public static final int INSTALL_RETRY_ATTEMPTS = 5;
87 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
88
89 private final Logger log = getLogger(getClass());
90
91 protected DeviceService deviceService;
92
93 // Note: The following dependencies are added on behalf of the pipeline
94 // driver behaviours to assure these services are available for their
95 // initialization.
96 protected FlowRuleService flowRuleService;
97
98 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
99 protected FlowObjectiveStore flowObjectiveStore;
100 private final FlowObjectiveStoreDelegate delegate;
101
102 private final PipelinerContext context = new InnerPipelineContext();
103
104 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700105
106 // local stores for queuing fwd and next objectives that are waiting for an
107 // associated next objective execution to complete. The signal for completed
108 // execution comes from a pipeline driver, in this or another controller
109 // instance, via the DistributedFlowObjectiveStore.
110 private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
111 Maps.newConcurrentMap();
112 private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
113 Maps.newConcurrentMap();
yoonseon86bebed2017-02-03 15:23:57 -0800114
115 // local store to track which nextObjectives were sent to which device
116 // for debugging purposes
117 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
118
119 private ExecutorService executorService;
120
121 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
122 NetworkId networkId) {
123 super(manager, networkId);
124
125 deviceService = manager.get(networkId(), DeviceService.class);
126 flowRuleService = manager.get(networkId(), FlowRuleService.class);
127
128 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
129
130 virtualFlowObjectiveStore =
131 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
132 delegate = new InternalStoreDelegate();
133 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
134 flowObjectiveStore = new StoreConvertor();
135 }
136
137 @Override
138 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
139 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
140 }
141
142 @Override
143 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700144 if (forwardingObjective.nextId() == null ||
145 forwardingObjective.op() == Objective.Operation.REMOVE ||
146 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
147 !queueFwdObjective(deviceId, forwardingObjective)) {
148 // fast path
149 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
yoonseon86bebed2017-02-03 15:23:57 -0800150 }
yoonseon86bebed2017-02-03 15:23:57 -0800151 }
152
153 @Override
154 public void next(DeviceId deviceId, NextObjective nextObjective) {
155 nextToDevice.put(nextObjective.id(), deviceId);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700156 if (nextObjective.op() == Objective.Operation.ADD ||
157 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
158 !queueNextObjective(deviceId, nextObjective)) {
159 // either group exists or we are trying to create it - let it through
160 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
161 }
yoonseon86bebed2017-02-03 15:23:57 -0800162 }
163
164 @Override
165 public int allocateNextId() {
166 return flowObjectiveStore.allocateNextId();
167 }
168
169 @Override
170 public void initPolicy(String policy) {
171
172 }
173
174 @Override
175 public List<String> getNextMappings() {
176 List<String> mappings = new ArrayList<>();
177 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
178 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
179 // then info on any nextObj could be retrieved from one controller instance.
180 // Right now the drivers on one instance can only fetch for next-ids that came
181 // to them.
182 // Also, we still need to send the right next-id to the right driver as potentially
183 // there can be different drivers for different devices. But on that account,
184 // no instance should be decoding for another instance's nextIds.
185
186 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
187 // get the device this next Objective was sent to
188 DeviceId deviceId = nextToDevice.get(e.getKey());
189 mappings.add("NextId " + e.getKey() + ": " +
190 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
191 if (deviceId != null) {
192 // this instance of the controller sent the nextObj to a driver
193 Pipeliner pipeliner = getDevicePipeliner(deviceId);
194 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
195 if (nextMappings != null) {
196 mappings.addAll(nextMappings);
197 }
198 }
199 }
200 return mappings;
201 }
202
203 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700204 public List<String> getPendingFlowObjectives() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700205 List<String> pendingFlowObjectives = new ArrayList<>();
206
yoonseon86bebed2017-02-03 15:23:57 -0800207 for (Integer nextId : pendingForwards.keySet()) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700208 Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
yoonseon86bebed2017-02-03 15:23:57 -0800209 StringBuilder pend = new StringBuilder();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700210 pend.append("NextId: ")
211 .append(nextId);
212 for (PendingFlowObjective pf : pfwd) {
213 pend.append("\n FwdId: ")
214 .append(String.format("%11s", pf.flowObjective().id()))
215 .append(", DeviceId: ")
216 .append(pf.deviceId())
217 .append(", Selector: ")
218 .append(((ForwardingObjective) pf.flowObjective())
219 .selector().criteria());
yoonseon86bebed2017-02-03 15:23:57 -0800220 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700221 pendingFlowObjectives.add(pend.toString());
yoonseon86bebed2017-02-03 15:23:57 -0800222 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700223
224 for (Integer nextId : pendingNexts.keySet()) {
225 Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
226 StringBuilder pend = new StringBuilder();
227 pend.append("NextId: ")
228 .append(nextId);
229 for (PendingFlowObjective pn : pnext) {
230 pend.append("\n NextOp: ")
231 .append(pn.flowObjective().op())
232 .append(", DeviceId: ")
233 .append(pn.deviceId())
234 .append(", Treatments: ")
235 .append(((NextObjective) pn.flowObjective())
236 .next());
237 }
238 pendingFlowObjectives.add(pend.toString());
239 }
240
241 return pendingFlowObjectives;
yoonseon86bebed2017-02-03 15:23:57 -0800242 }
243
Saurav Das1547b3f2017-05-05 17:01:08 -0700244 @Override
245 public List<String> getPendingNexts() {
246 return getPendingFlowObjectives();
247 }
248
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700249 private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
yoonseon86bebed2017-02-03 15:23:57 -0800250 boolean queued = false;
251 synchronized (pendingForwards) {
252 // double check the flow objective store, because this block could run
253 // after a notification arrives
254 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
255 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700256 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
yoonseon86bebed2017-02-03 15:23:57 -0800257 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700258 return Sets.newHashSet(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800259 } else {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700260 pending.add(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800261 return pending;
262 }
263 });
264 queued = true;
265 }
266 }
267 if (queued) {
268 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
269 fwd.id(), fwd.nextId(), deviceId);
270 }
271 return queued;
272 }
273
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700274 private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
275
276 // we need to hold off on other operations till we get notified that the
277 // initial group creation has succeeded
278 boolean queued = false;
279 synchronized (pendingNexts) {
280 // double check the flow objective store, because this block could run
281 // after a notification arrives
282 if (flowObjectiveStore.getNextGroup(next.id()) == null) {
283 pendingNexts.compute(next.id(), (id, pending) -> {
284 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
285 if (pending == null) {
286 return Sets.newHashSet(pendfo);
287 } else {
288 pending.add(pendfo);
289 return pending;
290 }
291 });
292 queued = true;
293 }
294 }
295 if (queued) {
296 log.debug("Queued next objective {} with operation {} meant for device {}",
297 next.id(), next.op(), deviceId);
298 }
299 return queued;
300 }
301
yoonseon86bebed2017-02-03 15:23:57 -0800302 /**
303 * Task that passes the flow objective down to the driver. The task will
304 * make a few attempts to find the appropriate driver, then eventually give
305 * up and report an error if no suitable driver could be found.
306 */
307 private class ObjectiveInstaller implements Runnable {
308 private final DeviceId deviceId;
309 private final Objective objective;
310
311 private final int numAttempts;
312
313 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
314 this(deviceId, objective, 1);
315 }
316
317 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
318 this.deviceId = checkNotNull(deviceId);
319 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800320 this.numAttempts = attemps;
yoonseon86bebed2017-02-03 15:23:57 -0800321 }
322
323 @Override
324 public void run() {
325 try {
326 Pipeliner pipeliner = getDevicePipeliner(deviceId);
327
328 if (pipeliner != null) {
329 if (objective instanceof NextObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700330 nextToDevice.put(objective.id(), deviceId);
yoonseon86bebed2017-02-03 15:23:57 -0800331 pipeliner.next((NextObjective) objective);
332 } else if (objective instanceof ForwardingObjective) {
333 pipeliner.forward((ForwardingObjective) objective);
334 } else {
335 pipeliner.filter((FilteringObjective) objective);
336 }
337 //Attempts to check if pipeliner is null for retry attempts
338 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
339 Thread.sleep(INSTALL_RETRY_INTERVAL);
340 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
341 } else {
342 // Otherwise we've tried a few times and failed, report an
343 // error back to the user.
344 objective.context().ifPresent(
345 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
346 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700347 //Exception thrown
yoonseon86bebed2017-02-03 15:23:57 -0800348 } catch (Exception e) {
349 log.warn("Exception while installing flow objective", e);
350 }
351 }
352 }
353
354 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
355 @Override
356 public void notify(ObjectiveEvent event) {
357 if (event.type() == ObjectiveEvent.Type.ADD) {
358 log.debug("Received notification of obj event {}", event);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700359 Set<PendingFlowObjective> pending;
360
361 // first send all pending flows
yoonseon86bebed2017-02-03 15:23:57 -0800362 synchronized (pendingForwards) {
363 // needs to be synchronized for queueObjective lookup
364 pending = pendingForwards.remove(event.subject());
365 }
yoonseon86bebed2017-02-03 15:23:57 -0800366 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700367 log.debug("No forwarding objectives pending for this "
368 + "obj event {}", event);
369 } else {
370 log.debug("Processing {} pending forwarding objectives for nextId {}",
371 pending.size(), event.subject());
372 pending.forEach(p -> getDevicePipeliner(p.deviceId())
373 .forward((ForwardingObjective) p.flowObjective()));
yoonseon86bebed2017-02-03 15:23:57 -0800374 }
375
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700376 // now check for pending next-objectives
377 synchronized (pendingNexts) {
378 // needs to be synchronized for queueObjective lookup
379 pending = pendingNexts.remove(event.subject());
380 }
381 if (pending == null) {
382 log.debug("No next objectives pending for this "
383 + "obj event {}", event);
384 } else {
385 log.debug("Processing {} pending next objectives for nextId {}",
386 pending.size(), event.subject());
387 pending.forEach(p -> getDevicePipeliner(p.deviceId())
388 .next((NextObjective) p.flowObjective()));
389 }
yoonseon86bebed2017-02-03 15:23:57 -0800390 }
391 }
392 }
393
394 /**
395 * Retrieves (if it exists) the device pipeline behaviour from the cache.
396 * Otherwise it warms the caches and triggers the init method of the Pipeline.
397 * For virtual network, it returns OVS pipeliner.
398 *
399 * @param deviceId the id of the device associated to the pipeline
400 * @return the implementation of the Pipeliner behaviour
401 */
402 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
403 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
404 }
405
406 /**
407 * Creates and initialize {@link Pipeliner}.
408 * <p>
409 * Note: Expected to be called under per-Device lock.
410 * e.g., {@code pipeliners}' Map#compute family methods
411 *
412 * @param deviceId Device to initialize pipeliner
413 * @return {@link Pipeliner} instance or null
414 */
415 private Pipeliner initPipelineHandler(DeviceId deviceId) {
416 //FIXME: do we need a standard pipeline for virtual device?
417 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
418 pipeliner.init(deviceId, context);
419 return pipeliner;
420 }
421
422 // Processing context for initializing pipeline driver behaviours.
423 private class InnerPipelineContext implements PipelinerContext {
Saurav Das1547b3f2017-05-05 17:01:08 -0700424 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800425 public ServiceDirectory directory() {
426 return serviceDirectory;
427 }
428
Saurav Das1547b3f2017-05-05 17:01:08 -0700429 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800430 public FlowObjectiveStore store() {
431 return flowObjectiveStore;
432 }
433 }
434
435 /**
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700436 * Data class used to hold a pending flow objective that could not
yoonseon86bebed2017-02-03 15:23:57 -0800437 * be processed because the associated next object was not present.
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700438 * Note that this pending flow objective could be a forwarding objective
439 * waiting for a next objective to complete execution. Or it could a
440 * next objective (with a different operation - remove, addToExisting, or
441 * removeFromExisting) waiting for a next objective with the same id to
442 * complete execution.
yoonseon86bebed2017-02-03 15:23:57 -0800443 */
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700444 private class PendingFlowObjective {
yoonseon86bebed2017-02-03 15:23:57 -0800445 private final DeviceId deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700446 private final Objective flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800447
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700448 public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
yoonseon86bebed2017-02-03 15:23:57 -0800449 this.deviceId = deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700450 this.flowObj = flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800451 }
452
453 public DeviceId deviceId() {
454 return deviceId;
455 }
456
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700457 public Objective flowObjective() {
458 return flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800459 }
460
461 @Override
462 public int hashCode() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700463 return Objects.hash(deviceId, flowObj);
yoonseon86bebed2017-02-03 15:23:57 -0800464 }
465
466 @Override
467 public boolean equals(final Object obj) {
468 if (this == obj) {
469 return true;
470 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700471 if (!(obj instanceof PendingFlowObjective)) {
yoonseon86bebed2017-02-03 15:23:57 -0800472 return false;
473 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700474 final PendingFlowObjective other = (PendingFlowObjective) obj;
yoonseon86bebed2017-02-03 15:23:57 -0800475 if (this.deviceId.equals(other.deviceId) &&
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700476 this.flowObj.equals(other.flowObj)) {
yoonseon86bebed2017-02-03 15:23:57 -0800477 return true;
478 }
479 return false;
480 }
481 }
482
483 /**
484 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
485 * to FlowObjectiveStore for PipelinerContext.
486 */
487 private class StoreConvertor implements FlowObjectiveStore {
488
489 @Override
490 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
491 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
492 }
493
494 @Override
495 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
496 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
497 }
498
499 @Override
500 public boolean hasDelegate() {
501 return virtualFlowObjectiveStore.hasDelegate(networkId());
502 }
503
504 @Override
505 public void putNextGroup(Integer nextId, NextGroup group) {
506 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
507 }
508
509 @Override
510 public NextGroup getNextGroup(Integer nextId) {
511 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
512 }
513
514 @Override
515 public NextGroup removeNextGroup(Integer nextId) {
516 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
517 }
518
519 @Override
520 public Map<Integer, NextGroup> getAllGroups() {
521 return virtualFlowObjectiveStore.getAllGroups(networkId());
522 }
523
524 @Override
525 public int allocateNextId() {
526 return virtualFlowObjectiveStore.allocateNextId(networkId());
527 }
528 }
529
530 /**
531 * Simple single table pipeline abstraction for virtual networks.
532 */
533 private class DefaultVirtualDevicePipeline
534 extends AbstractHandlerBehaviour implements Pipeliner {
535
536 private final Logger log = getLogger(getClass());
537
538 private DeviceId deviceId;
539
540 private Cache<Integer, NextObjective> pendingNext;
541
542 private KryoNamespace appKryo = new KryoNamespace.Builder()
543 .register(GroupKey.class)
544 .register(DefaultGroupKey.class)
545 .register(SingleGroup.class)
546 .register(byte[].class)
547 .build("DefaultVirtualDevicePipeline");
548
549 @Override
550 public void init(DeviceId deviceId, PipelinerContext context) {
551 this.deviceId = deviceId;
552
553 pendingNext = CacheBuilder.newBuilder()
554 .expireAfterWrite(20, TimeUnit.SECONDS)
555 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
556 if (notification.getCause() == RemovalCause.EXPIRED) {
557 notification.getValue().context()
558 .ifPresent(c -> c.onError(notification.getValue(),
559 ObjectiveError.FLOWINSTALLATIONFAILED));
560 }
561 }).build();
562 }
563
564 @Override
565 public void filter(FilteringObjective filter) {
566
567 TrafficTreatment.Builder actions;
568 switch (filter.type()) {
569 case PERMIT:
570 actions = (filter.meta() == null) ?
571 DefaultTrafficTreatment.builder().punt() :
572 DefaultTrafficTreatment.builder(filter.meta());
573 break;
574 case DENY:
575 actions = (filter.meta() == null) ?
576 DefaultTrafficTreatment.builder() :
577 DefaultTrafficTreatment.builder(filter.meta());
578 actions.drop();
579 break;
580 default:
581 log.warn("Unknown filter type: {}", filter.type());
582 actions = DefaultTrafficTreatment.builder().drop();
583 }
584
585 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
586
587 filter.conditions().forEach(selector::add);
588
589 if (filter.key() != null) {
590 selector.add(filter.key());
591 }
592
593 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
594 .forDevice(deviceId)
595 .withSelector(selector.build())
596 .withTreatment(actions.build())
597 .fromApp(filter.appId())
598 .withPriority(filter.priority());
599
600 if (filter.permanent()) {
601 ruleBuilder.makePermanent();
602 } else {
603 ruleBuilder.makeTemporary(filter.timeout());
604 }
605
606 installObjective(ruleBuilder, filter);
607 }
608
609 @Override
610 public void forward(ForwardingObjective fwd) {
611 TrafficSelector selector = fwd.selector();
612
613 if (fwd.treatment() != null) {
614 // Deal with SPECIFIC and VERSATILE in the same manner.
615 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
616 .forDevice(deviceId)
617 .withSelector(selector)
618 .fromApp(fwd.appId())
619 .withPriority(fwd.priority())
620 .withTreatment(fwd.treatment());
621
622 if (fwd.permanent()) {
623 ruleBuilder.makePermanent();
624 } else {
625 ruleBuilder.makeTemporary(fwd.timeout());
626 }
627 installObjective(ruleBuilder, fwd);
628
629 } else {
630 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
631 if (nextObjective != null) {
632 pendingNext.invalidate(fwd.nextId());
633 nextObjective.next().forEach(treat -> {
634 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
635 .forDevice(deviceId)
636 .withSelector(selector)
637 .fromApp(fwd.appId())
638 .withPriority(fwd.priority())
639 .withTreatment(treat);
640
641 if (fwd.permanent()) {
642 ruleBuilder.makePermanent();
643 } else {
644 ruleBuilder.makeTemporary(fwd.timeout());
645 }
646 installObjective(ruleBuilder, fwd);
647 });
648 } else {
649 fwd.context().ifPresent(c -> c.onError(fwd,
650 ObjectiveError.GROUPMISSING));
651 }
652 }
653 }
654
655 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
656 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
657 switch (objective.op()) {
658
659 case ADD:
660 flowBuilder.add(ruleBuilder.build());
661 break;
662 case REMOVE:
663 flowBuilder.remove(ruleBuilder.build());
664 break;
665 default:
666 log.warn("Unknown operation {}", objective.op());
667 }
668
669 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
670 @Override
671 public void onSuccess(FlowRuleOperations ops) {
672 objective.context().ifPresent(context -> context.onSuccess(objective));
673 }
674
675 @Override
676 public void onError(FlowRuleOperations ops) {
677 objective.context()
678 .ifPresent(context ->
679 context.onError(objective,
680 ObjectiveError.FLOWINSTALLATIONFAILED));
681 }
682 }));
683 }
684
685 @Override
686 public void next(NextObjective nextObjective) {
687
688 pendingNext.put(nextObjective.id(), nextObjective);
689 flowObjectiveStore.putNextGroup(nextObjective.id(),
690 new SingleGroup(
691 new DefaultGroupKey(
692 appKryo.serialize(nextObjective.id()))));
693 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
694 }
695
696 @Override
697 public List<String> getNextMappings(NextGroup nextGroup) {
698 // Default single table pipeline does not use nextObjectives or groups
699 return null;
700 }
701
702 private class SingleGroup implements NextGroup {
703
704 private final GroupKey key;
705
706 public SingleGroup(GroupKey key) {
707 this.key = key;
708 }
709
710 public GroupKey key() {
711 return key;
712 }
713
714 @Override
715 public byte[] data() {
716 return appKryo.serialize(key);
717 }
718 }
719 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700720
yoonseon86bebed2017-02-03 15:23:57 -0800721}