blob: b4c84b6ee553e2a28277c74e92cd51ad2a0c395f [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
Thomas Vachuska52f2cd12018-11-08 21:20:04 -08002 * Copyright 2018-present Open Networking Foundation
yoonseon86bebed2017-02-03 15:23:57 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
Harshada Chaundkar5a198b02019-07-03 16:27:45 +000025import com.google.common.collect.ImmutableMap;
26import org.apache.commons.lang3.tuple.Pair;
yoonseon86bebed2017-02-03 15:23:57 -080027import org.onlab.osgi.ServiceDirectory;
28import org.onlab.util.KryoNamespace;
Daniele Moro06aac702021-07-19 22:39:22 +020029import org.onosproject.core.ApplicationId;
yoonseon86bebed2017-02-03 15:23:57 -080030import org.onosproject.incubator.net.virtual.AbstractVnetService;
31import org.onosproject.incubator.net.virtual.NetworkId;
32import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
33import org.onosproject.incubator.net.virtual.VirtualNetworkService;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.behaviour.NextGroup;
36import org.onosproject.net.behaviour.Pipeliner;
37import org.onosproject.net.behaviour.PipelinerContext;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.driver.AbstractHandlerBehaviour;
40import org.onosproject.net.flow.DefaultFlowRule;
41import org.onosproject.net.flow.DefaultTrafficSelector;
42import org.onosproject.net.flow.DefaultTrafficTreatment;
43import org.onosproject.net.flow.FlowRule;
44import org.onosproject.net.flow.FlowRuleOperations;
45import org.onosproject.net.flow.FlowRuleOperationsContext;
46import org.onosproject.net.flow.FlowRuleService;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.flowobjective.FilteringObjective;
50import org.onosproject.net.flowobjective.FlowObjectiveService;
51import org.onosproject.net.flowobjective.FlowObjectiveStore;
52import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
53import org.onosproject.net.flowobjective.ForwardingObjective;
54import org.onosproject.net.flowobjective.NextObjective;
55import org.onosproject.net.flowobjective.Objective;
56import org.onosproject.net.flowobjective.ObjectiveError;
57import org.onosproject.net.flowobjective.ObjectiveEvent;
58import org.onosproject.net.group.DefaultGroupKey;
59import org.onosproject.net.group.GroupKey;
60import org.slf4j.Logger;
61
62import java.util.ArrayList;
63import java.util.List;
64import java.util.Map;
65import java.util.Objects;
66import java.util.Set;
67import java.util.concurrent.ExecutorService;
68import java.util.concurrent.TimeUnit;
69
70import static com.google.common.base.Preconditions.checkNotNull;
71import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
72import static org.onlab.util.Tools.groupedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Provides implementation of the flow objective programming service for virtual networks.
77 */
78// NOTE: This manager is designed to provide flow objective programming service
79// for virtual networks. Actually, virtual networks don't need to consider
80// the different implementation of data-path pipeline. But, the interfaces
81// and usages of flow objective service are still valuable for virtual network.
82// This manager is working as an interpreter from FlowObjective to FlowRules
83// to provide symmetric interfaces with ONOS core services.
84// The behaviours are based on DefaultSingleTablePipeline.
85
86public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
87 implements FlowObjectiveService {
88
89 public static final int INSTALL_RETRY_ATTEMPTS = 5;
90 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
91
92 private final Logger log = getLogger(getClass());
93
94 protected DeviceService deviceService;
95
96 // Note: The following dependencies are added on behalf of the pipeline
97 // driver behaviours to assure these services are available for their
98 // initialization.
99 protected FlowRuleService flowRuleService;
100
101 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
102 protected FlowObjectiveStore flowObjectiveStore;
103 private final FlowObjectiveStoreDelegate delegate;
104
105 private final PipelinerContext context = new InnerPipelineContext();
106
107 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700108
109 // local stores for queuing fwd and next objectives that are waiting for an
110 // associated next objective execution to complete. The signal for completed
111 // execution comes from a pipeline driver, in this or another controller
112 // instance, via the DistributedFlowObjectiveStore.
113 private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
114 Maps.newConcurrentMap();
115 private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
116 Maps.newConcurrentMap();
yoonseon86bebed2017-02-03 15:23:57 -0800117
118 // local store to track which nextObjectives were sent to which device
119 // for debugging purposes
120 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
121
122 private ExecutorService executorService;
123
124 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
125 NetworkId networkId) {
126 super(manager, networkId);
127
128 deviceService = manager.get(networkId(), DeviceService.class);
129 flowRuleService = manager.get(networkId(), FlowRuleService.class);
130
131 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
132
133 virtualFlowObjectiveStore =
134 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
135 delegate = new InternalStoreDelegate();
136 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
137 flowObjectiveStore = new StoreConvertor();
138 }
139
140 @Override
141 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
142 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
143 }
144
145 @Override
146 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700147 if (forwardingObjective.nextId() == null ||
148 forwardingObjective.op() == Objective.Operation.REMOVE ||
149 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
150 !queueFwdObjective(deviceId, forwardingObjective)) {
151 // fast path
152 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
yoonseon86bebed2017-02-03 15:23:57 -0800153 }
yoonseon86bebed2017-02-03 15:23:57 -0800154 }
155
156 @Override
157 public void next(DeviceId deviceId, NextObjective nextObjective) {
158 nextToDevice.put(nextObjective.id(), deviceId);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700159 if (nextObjective.op() == Objective.Operation.ADD ||
160 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
161 !queueNextObjective(deviceId, nextObjective)) {
162 // either group exists or we are trying to create it - let it through
163 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
164 }
yoonseon86bebed2017-02-03 15:23:57 -0800165 }
166
167 @Override
168 public int allocateNextId() {
169 return flowObjectiveStore.allocateNextId();
170 }
171
172 @Override
173 public void initPolicy(String policy) {
174
175 }
176
177 @Override
Harshada Chaundkar5a198b02019-07-03 16:27:45 +0000178 public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
179 return ImmutableMap.of();
180 }
181
182 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800183 public List<String> getNextMappings() {
184 List<String> mappings = new ArrayList<>();
185 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
186 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
187 // then info on any nextObj could be retrieved from one controller instance.
188 // Right now the drivers on one instance can only fetch for next-ids that came
189 // to them.
190 // Also, we still need to send the right next-id to the right driver as potentially
191 // there can be different drivers for different devices. But on that account,
192 // no instance should be decoding for another instance's nextIds.
193
194 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
195 // get the device this next Objective was sent to
196 DeviceId deviceId = nextToDevice.get(e.getKey());
197 mappings.add("NextId " + e.getKey() + ": " +
198 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
199 if (deviceId != null) {
200 // this instance of the controller sent the nextObj to a driver
201 Pipeliner pipeliner = getDevicePipeliner(deviceId);
202 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
203 if (nextMappings != null) {
204 mappings.addAll(nextMappings);
205 }
206 }
207 }
208 return mappings;
209 }
210
211 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700212 public List<String> getPendingFlowObjectives() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700213 List<String> pendingFlowObjectives = new ArrayList<>();
214
yoonseon86bebed2017-02-03 15:23:57 -0800215 for (Integer nextId : pendingForwards.keySet()) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700216 Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
yoonseon86bebed2017-02-03 15:23:57 -0800217 StringBuilder pend = new StringBuilder();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700218 pend.append("NextId: ")
219 .append(nextId);
220 for (PendingFlowObjective pf : pfwd) {
221 pend.append("\n FwdId: ")
222 .append(String.format("%11s", pf.flowObjective().id()))
223 .append(", DeviceId: ")
224 .append(pf.deviceId())
225 .append(", Selector: ")
226 .append(((ForwardingObjective) pf.flowObjective())
227 .selector().criteria());
yoonseon86bebed2017-02-03 15:23:57 -0800228 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700229 pendingFlowObjectives.add(pend.toString());
yoonseon86bebed2017-02-03 15:23:57 -0800230 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700231
232 for (Integer nextId : pendingNexts.keySet()) {
233 Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
234 StringBuilder pend = new StringBuilder();
235 pend.append("NextId: ")
236 .append(nextId);
237 for (PendingFlowObjective pn : pnext) {
238 pend.append("\n NextOp: ")
239 .append(pn.flowObjective().op())
240 .append(", DeviceId: ")
241 .append(pn.deviceId())
242 .append(", Treatments: ")
243 .append(((NextObjective) pn.flowObjective())
244 .next());
245 }
246 pendingFlowObjectives.add(pend.toString());
247 }
248
249 return pendingFlowObjectives;
yoonseon86bebed2017-02-03 15:23:57 -0800250 }
251
Daniele Moro06aac702021-07-19 22:39:22 +0200252 @Override
253 public void purgeAll(DeviceId deviceId, ApplicationId appId) {
254 // TODO: purge queued flow objectives?
255 pipeliners.get(deviceId).purgeAll(appId);
256 }
257
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700258 private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
yoonseon86bebed2017-02-03 15:23:57 -0800259 boolean queued = false;
260 synchronized (pendingForwards) {
261 // double check the flow objective store, because this block could run
262 // after a notification arrives
263 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
264 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700265 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
yoonseon86bebed2017-02-03 15:23:57 -0800266 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700267 return Sets.newHashSet(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800268 } else {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700269 pending.add(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800270 return pending;
271 }
272 });
273 queued = true;
274 }
275 }
276 if (queued) {
277 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
278 fwd.id(), fwd.nextId(), deviceId);
279 }
280 return queued;
281 }
282
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700283 private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
284
285 // we need to hold off on other operations till we get notified that the
286 // initial group creation has succeeded
287 boolean queued = false;
288 synchronized (pendingNexts) {
289 // double check the flow objective store, because this block could run
290 // after a notification arrives
291 if (flowObjectiveStore.getNextGroup(next.id()) == null) {
292 pendingNexts.compute(next.id(), (id, pending) -> {
293 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
294 if (pending == null) {
295 return Sets.newHashSet(pendfo);
296 } else {
297 pending.add(pendfo);
298 return pending;
299 }
300 });
301 queued = true;
302 }
303 }
304 if (queued) {
305 log.debug("Queued next objective {} with operation {} meant for device {}",
306 next.id(), next.op(), deviceId);
307 }
308 return queued;
309 }
310
yoonseon86bebed2017-02-03 15:23:57 -0800311 /**
312 * Task that passes the flow objective down to the driver. The task will
313 * make a few attempts to find the appropriate driver, then eventually give
314 * up and report an error if no suitable driver could be found.
315 */
316 private class ObjectiveInstaller implements Runnable {
317 private final DeviceId deviceId;
318 private final Objective objective;
319
320 private final int numAttempts;
321
322 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
323 this(deviceId, objective, 1);
324 }
325
326 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
327 this.deviceId = checkNotNull(deviceId);
328 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800329 this.numAttempts = attemps;
yoonseon86bebed2017-02-03 15:23:57 -0800330 }
331
332 @Override
333 public void run() {
334 try {
335 Pipeliner pipeliner = getDevicePipeliner(deviceId);
336
337 if (pipeliner != null) {
338 if (objective instanceof NextObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700339 nextToDevice.put(objective.id(), deviceId);
yoonseon86bebed2017-02-03 15:23:57 -0800340 pipeliner.next((NextObjective) objective);
341 } else if (objective instanceof ForwardingObjective) {
342 pipeliner.forward((ForwardingObjective) objective);
343 } else {
344 pipeliner.filter((FilteringObjective) objective);
345 }
346 //Attempts to check if pipeliner is null for retry attempts
347 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
348 Thread.sleep(INSTALL_RETRY_INTERVAL);
349 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
350 } else {
351 // Otherwise we've tried a few times and failed, report an
352 // error back to the user.
353 objective.context().ifPresent(
354 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
355 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700356 //Exception thrown
yoonseon86bebed2017-02-03 15:23:57 -0800357 } catch (Exception e) {
358 log.warn("Exception while installing flow objective", e);
359 }
360 }
361 }
362
363 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
364 @Override
365 public void notify(ObjectiveEvent event) {
366 if (event.type() == ObjectiveEvent.Type.ADD) {
367 log.debug("Received notification of obj event {}", event);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700368 Set<PendingFlowObjective> pending;
369
370 // first send all pending flows
yoonseon86bebed2017-02-03 15:23:57 -0800371 synchronized (pendingForwards) {
372 // needs to be synchronized for queueObjective lookup
373 pending = pendingForwards.remove(event.subject());
374 }
yoonseon86bebed2017-02-03 15:23:57 -0800375 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700376 log.debug("No forwarding objectives pending for this "
377 + "obj event {}", event);
378 } else {
379 log.debug("Processing {} pending forwarding objectives for nextId {}",
380 pending.size(), event.subject());
381 pending.forEach(p -> getDevicePipeliner(p.deviceId())
382 .forward((ForwardingObjective) p.flowObjective()));
yoonseon86bebed2017-02-03 15:23:57 -0800383 }
384
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700385 // now check for pending next-objectives
386 synchronized (pendingNexts) {
387 // needs to be synchronized for queueObjective lookup
388 pending = pendingNexts.remove(event.subject());
389 }
390 if (pending == null) {
391 log.debug("No next objectives pending for this "
392 + "obj event {}", event);
393 } else {
394 log.debug("Processing {} pending next objectives for nextId {}",
395 pending.size(), event.subject());
396 pending.forEach(p -> getDevicePipeliner(p.deviceId())
397 .next((NextObjective) p.flowObjective()));
398 }
yoonseon86bebed2017-02-03 15:23:57 -0800399 }
400 }
401 }
402
403 /**
404 * Retrieves (if it exists) the device pipeline behaviour from the cache.
405 * Otherwise it warms the caches and triggers the init method of the Pipeline.
406 * For virtual network, it returns OVS pipeliner.
407 *
408 * @param deviceId the id of the device associated to the pipeline
409 * @return the implementation of the Pipeliner behaviour
410 */
411 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
412 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
413 }
414
415 /**
416 * Creates and initialize {@link Pipeliner}.
417 * <p>
418 * Note: Expected to be called under per-Device lock.
419 * e.g., {@code pipeliners}' Map#compute family methods
420 *
421 * @param deviceId Device to initialize pipeliner
422 * @return {@link Pipeliner} instance or null
423 */
424 private Pipeliner initPipelineHandler(DeviceId deviceId) {
425 //FIXME: do we need a standard pipeline for virtual device?
426 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
427 pipeliner.init(deviceId, context);
428 return pipeliner;
429 }
430
431 // Processing context for initializing pipeline driver behaviours.
432 private class InnerPipelineContext implements PipelinerContext {
Saurav Das1547b3f2017-05-05 17:01:08 -0700433 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800434 public ServiceDirectory directory() {
435 return serviceDirectory;
436 }
437
Saurav Das1547b3f2017-05-05 17:01:08 -0700438 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800439 public FlowObjectiveStore store() {
440 return flowObjectiveStore;
441 }
442 }
443
444 /**
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700445 * Data class used to hold a pending flow objective that could not
yoonseon86bebed2017-02-03 15:23:57 -0800446 * be processed because the associated next object was not present.
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700447 * Note that this pending flow objective could be a forwarding objective
448 * waiting for a next objective to complete execution. Or it could a
449 * next objective (with a different operation - remove, addToExisting, or
450 * removeFromExisting) waiting for a next objective with the same id to
451 * complete execution.
yoonseon86bebed2017-02-03 15:23:57 -0800452 */
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700453 private class PendingFlowObjective {
yoonseon86bebed2017-02-03 15:23:57 -0800454 private final DeviceId deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700455 private final Objective flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800456
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700457 public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
yoonseon86bebed2017-02-03 15:23:57 -0800458 this.deviceId = deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700459 this.flowObj = flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800460 }
461
462 public DeviceId deviceId() {
463 return deviceId;
464 }
465
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700466 public Objective flowObjective() {
467 return flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800468 }
469
470 @Override
471 public int hashCode() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700472 return Objects.hash(deviceId, flowObj);
yoonseon86bebed2017-02-03 15:23:57 -0800473 }
474
475 @Override
476 public boolean equals(final Object obj) {
477 if (this == obj) {
478 return true;
479 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700480 if (!(obj instanceof PendingFlowObjective)) {
yoonseon86bebed2017-02-03 15:23:57 -0800481 return false;
482 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700483 final PendingFlowObjective other = (PendingFlowObjective) obj;
yoonseon86bebed2017-02-03 15:23:57 -0800484 if (this.deviceId.equals(other.deviceId) &&
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700485 this.flowObj.equals(other.flowObj)) {
yoonseon86bebed2017-02-03 15:23:57 -0800486 return true;
487 }
488 return false;
489 }
490 }
491
492 /**
493 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
494 * to FlowObjectiveStore for PipelinerContext.
495 */
496 private class StoreConvertor implements FlowObjectiveStore {
497
498 @Override
499 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
500 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
501 }
502
503 @Override
504 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
505 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
506 }
507
508 @Override
509 public boolean hasDelegate() {
510 return virtualFlowObjectiveStore.hasDelegate(networkId());
511 }
512
513 @Override
514 public void putNextGroup(Integer nextId, NextGroup group) {
515 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
516 }
517
518 @Override
519 public NextGroup getNextGroup(Integer nextId) {
520 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
521 }
522
523 @Override
524 public NextGroup removeNextGroup(Integer nextId) {
525 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
526 }
527
528 @Override
529 public Map<Integer, NextGroup> getAllGroups() {
530 return virtualFlowObjectiveStore.getAllGroups(networkId());
531 }
532
533 @Override
534 public int allocateNextId() {
535 return virtualFlowObjectiveStore.allocateNextId(networkId());
536 }
537 }
538
539 /**
540 * Simple single table pipeline abstraction for virtual networks.
541 */
542 private class DefaultVirtualDevicePipeline
543 extends AbstractHandlerBehaviour implements Pipeliner {
544
545 private final Logger log = getLogger(getClass());
546
547 private DeviceId deviceId;
548
549 private Cache<Integer, NextObjective> pendingNext;
550
551 private KryoNamespace appKryo = new KryoNamespace.Builder()
552 .register(GroupKey.class)
553 .register(DefaultGroupKey.class)
554 .register(SingleGroup.class)
555 .register(byte[].class)
556 .build("DefaultVirtualDevicePipeline");
557
558 @Override
559 public void init(DeviceId deviceId, PipelinerContext context) {
560 this.deviceId = deviceId;
561
562 pendingNext = CacheBuilder.newBuilder()
563 .expireAfterWrite(20, TimeUnit.SECONDS)
564 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
565 if (notification.getCause() == RemovalCause.EXPIRED) {
566 notification.getValue().context()
567 .ifPresent(c -> c.onError(notification.getValue(),
568 ObjectiveError.FLOWINSTALLATIONFAILED));
569 }
570 }).build();
571 }
572
573 @Override
574 public void filter(FilteringObjective filter) {
575
576 TrafficTreatment.Builder actions;
577 switch (filter.type()) {
578 case PERMIT:
579 actions = (filter.meta() == null) ?
580 DefaultTrafficTreatment.builder().punt() :
581 DefaultTrafficTreatment.builder(filter.meta());
582 break;
583 case DENY:
584 actions = (filter.meta() == null) ?
585 DefaultTrafficTreatment.builder() :
586 DefaultTrafficTreatment.builder(filter.meta());
587 actions.drop();
588 break;
589 default:
590 log.warn("Unknown filter type: {}", filter.type());
591 actions = DefaultTrafficTreatment.builder().drop();
592 }
593
594 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
595
596 filter.conditions().forEach(selector::add);
597
598 if (filter.key() != null) {
599 selector.add(filter.key());
600 }
601
602 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
603 .forDevice(deviceId)
604 .withSelector(selector.build())
605 .withTreatment(actions.build())
606 .fromApp(filter.appId())
607 .withPriority(filter.priority());
608
609 if (filter.permanent()) {
610 ruleBuilder.makePermanent();
611 } else {
612 ruleBuilder.makeTemporary(filter.timeout());
613 }
614
615 installObjective(ruleBuilder, filter);
616 }
617
618 @Override
619 public void forward(ForwardingObjective fwd) {
620 TrafficSelector selector = fwd.selector();
621
622 if (fwd.treatment() != null) {
623 // Deal with SPECIFIC and VERSATILE in the same manner.
624 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
625 .forDevice(deviceId)
626 .withSelector(selector)
627 .fromApp(fwd.appId())
628 .withPriority(fwd.priority())
629 .withTreatment(fwd.treatment());
630
631 if (fwd.permanent()) {
632 ruleBuilder.makePermanent();
633 } else {
634 ruleBuilder.makeTemporary(fwd.timeout());
635 }
636 installObjective(ruleBuilder, fwd);
637
638 } else {
639 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
640 if (nextObjective != null) {
641 pendingNext.invalidate(fwd.nextId());
642 nextObjective.next().forEach(treat -> {
643 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
644 .forDevice(deviceId)
645 .withSelector(selector)
646 .fromApp(fwd.appId())
647 .withPriority(fwd.priority())
648 .withTreatment(treat);
649
650 if (fwd.permanent()) {
651 ruleBuilder.makePermanent();
652 } else {
653 ruleBuilder.makeTemporary(fwd.timeout());
654 }
655 installObjective(ruleBuilder, fwd);
656 });
657 } else {
658 fwd.context().ifPresent(c -> c.onError(fwd,
659 ObjectiveError.GROUPMISSING));
660 }
661 }
662 }
663
664 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
665 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
666 switch (objective.op()) {
667
668 case ADD:
669 flowBuilder.add(ruleBuilder.build());
670 break;
671 case REMOVE:
672 flowBuilder.remove(ruleBuilder.build());
673 break;
674 default:
675 log.warn("Unknown operation {}", objective.op());
676 }
677
678 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
679 @Override
680 public void onSuccess(FlowRuleOperations ops) {
681 objective.context().ifPresent(context -> context.onSuccess(objective));
682 }
683
684 @Override
685 public void onError(FlowRuleOperations ops) {
686 objective.context()
687 .ifPresent(context ->
688 context.onError(objective,
689 ObjectiveError.FLOWINSTALLATIONFAILED));
690 }
691 }));
692 }
693
694 @Override
695 public void next(NextObjective nextObjective) {
696
697 pendingNext.put(nextObjective.id(), nextObjective);
698 flowObjectiveStore.putNextGroup(nextObjective.id(),
699 new SingleGroup(
700 new DefaultGroupKey(
701 appKryo.serialize(nextObjective.id()))));
702 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
703 }
704
705 @Override
Daniele Moro06aac702021-07-19 22:39:22 +0200706 public void purgeAll(ApplicationId appId) {
707 flowRuleService.purgeFlowRules(deviceId, appId);
708 }
709
710 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800711 public List<String> getNextMappings(NextGroup nextGroup) {
712 // Default single table pipeline does not use nextObjectives or groups
713 return null;
714 }
715
716 private class SingleGroup implements NextGroup {
717
718 private final GroupKey key;
719
720 public SingleGroup(GroupKey key) {
721 this.key = key;
722 }
723
724 public GroupKey key() {
725 return key;
726 }
727
728 @Override
729 public byte[] data() {
730 return appKryo.serialize(key);
731 }
732 }
733 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700734
yoonseon86bebed2017-02-03 15:23:57 -0800735}