blob: 84ce3ebc0853de7defc895d308fa1227a1207afe [file] [log] [blame]
yoonseon86bebed2017-02-03 15:23:57 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
yoonseon86bebed2017-02-03 15:23:57 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.incubator.net.virtual.AbstractVnetService;
28import org.onosproject.incubator.net.virtual.NetworkId;
29import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
30import org.onosproject.incubator.net.virtual.VirtualNetworkService;
31import org.onosproject.net.DeviceId;
32import org.onosproject.net.behaviour.NextGroup;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.driver.AbstractHandlerBehaviour;
37import org.onosproject.net.flow.DefaultFlowRule;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.FlowRule;
41import org.onosproject.net.flow.FlowRuleOperations;
42import org.onosproject.net.flow.FlowRuleOperationsContext;
43import org.onosproject.net.flow.FlowRuleService;
44import org.onosproject.net.flow.TrafficSelector;
45import org.onosproject.net.flow.TrafficTreatment;
46import org.onosproject.net.flowobjective.FilteringObjective;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.FlowObjectiveStore;
49import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
50import org.onosproject.net.flowobjective.ForwardingObjective;
51import org.onosproject.net.flowobjective.NextObjective;
52import org.onosproject.net.flowobjective.Objective;
53import org.onosproject.net.flowobjective.ObjectiveError;
54import org.onosproject.net.flowobjective.ObjectiveEvent;
55import org.onosproject.net.group.DefaultGroupKey;
56import org.onosproject.net.group.GroupKey;
57import org.slf4j.Logger;
58
59import java.util.ArrayList;
60import java.util.List;
61import java.util.Map;
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.TimeUnit;
66
67import static com.google.common.base.Preconditions.checkNotNull;
68import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Provides implementation of the flow objective programming service for virtual networks.
74 */
75// NOTE: This manager is designed to provide flow objective programming service
76// for virtual networks. Actually, virtual networks don't need to consider
77// the different implementation of data-path pipeline. But, the interfaces
78// and usages of flow objective service are still valuable for virtual network.
79// This manager is working as an interpreter from FlowObjective to FlowRules
80// to provide symmetric interfaces with ONOS core services.
81// The behaviours are based on DefaultSingleTablePipeline.
82
83public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
84 implements FlowObjectiveService {
85
86 public static final int INSTALL_RETRY_ATTEMPTS = 5;
87 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
88
89 private final Logger log = getLogger(getClass());
90
91 protected DeviceService deviceService;
92
93 // Note: The following dependencies are added on behalf of the pipeline
94 // driver behaviours to assure these services are available for their
95 // initialization.
96 protected FlowRuleService flowRuleService;
97
98 protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
99 protected FlowObjectiveStore flowObjectiveStore;
100 private final FlowObjectiveStoreDelegate delegate;
101
102 private final PipelinerContext context = new InnerPipelineContext();
103
104 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700105
106 // local stores for queuing fwd and next objectives that are waiting for an
107 // associated next objective execution to complete. The signal for completed
108 // execution comes from a pipeline driver, in this or another controller
109 // instance, via the DistributedFlowObjectiveStore.
110 private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
111 Maps.newConcurrentMap();
112 private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
113 Maps.newConcurrentMap();
yoonseon86bebed2017-02-03 15:23:57 -0800114
115 // local store to track which nextObjectives were sent to which device
116 // for debugging purposes
117 private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
118
119 private ExecutorService executorService;
120
121 public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
122 NetworkId networkId) {
123 super(manager, networkId);
124
125 deviceService = manager.get(networkId(), DeviceService.class);
126 flowRuleService = manager.get(networkId(), FlowRuleService.class);
127
128 executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
129
130 virtualFlowObjectiveStore =
131 serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
132 delegate = new InternalStoreDelegate();
133 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
134 flowObjectiveStore = new StoreConvertor();
135 }
136
137 @Override
138 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
139 executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
140 }
141
142 @Override
143 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700144 if (forwardingObjective.nextId() == null ||
145 forwardingObjective.op() == Objective.Operation.REMOVE ||
146 flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
147 !queueFwdObjective(deviceId, forwardingObjective)) {
148 // fast path
149 executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
yoonseon86bebed2017-02-03 15:23:57 -0800150 }
yoonseon86bebed2017-02-03 15:23:57 -0800151 }
152
153 @Override
154 public void next(DeviceId deviceId, NextObjective nextObjective) {
155 nextToDevice.put(nextObjective.id(), deviceId);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700156 if (nextObjective.op() == Objective.Operation.ADD ||
157 flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
158 !queueNextObjective(deviceId, nextObjective)) {
159 // either group exists or we are trying to create it - let it through
160 executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
161 }
yoonseon86bebed2017-02-03 15:23:57 -0800162 }
163
164 @Override
165 public int allocateNextId() {
166 return flowObjectiveStore.allocateNextId();
167 }
168
169 @Override
170 public void initPolicy(String policy) {
171
172 }
173
174 @Override
175 public List<String> getNextMappings() {
176 List<String> mappings = new ArrayList<>();
177 Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
178 // XXX if the NextGroup after de-serialization actually stored info of the deviceId
179 // then info on any nextObj could be retrieved from one controller instance.
180 // Right now the drivers on one instance can only fetch for next-ids that came
181 // to them.
182 // Also, we still need to send the right next-id to the right driver as potentially
183 // there can be different drivers for different devices. But on that account,
184 // no instance should be decoding for another instance's nextIds.
185
186 for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
187 // get the device this next Objective was sent to
188 DeviceId deviceId = nextToDevice.get(e.getKey());
189 mappings.add("NextId " + e.getKey() + ": " +
190 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
191 if (deviceId != null) {
192 // this instance of the controller sent the nextObj to a driver
193 Pipeliner pipeliner = getDevicePipeliner(deviceId);
194 List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
195 if (nextMappings != null) {
196 mappings.addAll(nextMappings);
197 }
198 }
199 }
200 return mappings;
201 }
202
203 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700204 public List<String> getPendingFlowObjectives() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700205 List<String> pendingFlowObjectives = new ArrayList<>();
206
yoonseon86bebed2017-02-03 15:23:57 -0800207 for (Integer nextId : pendingForwards.keySet()) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700208 Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
yoonseon86bebed2017-02-03 15:23:57 -0800209 StringBuilder pend = new StringBuilder();
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700210 pend.append("NextId: ")
211 .append(nextId);
212 for (PendingFlowObjective pf : pfwd) {
213 pend.append("\n FwdId: ")
214 .append(String.format("%11s", pf.flowObjective().id()))
215 .append(", DeviceId: ")
216 .append(pf.deviceId())
217 .append(", Selector: ")
218 .append(((ForwardingObjective) pf.flowObjective())
219 .selector().criteria());
yoonseon86bebed2017-02-03 15:23:57 -0800220 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700221 pendingFlowObjectives.add(pend.toString());
yoonseon86bebed2017-02-03 15:23:57 -0800222 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700223
224 for (Integer nextId : pendingNexts.keySet()) {
225 Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
226 StringBuilder pend = new StringBuilder();
227 pend.append("NextId: ")
228 .append(nextId);
229 for (PendingFlowObjective pn : pnext) {
230 pend.append("\n NextOp: ")
231 .append(pn.flowObjective().op())
232 .append(", DeviceId: ")
233 .append(pn.deviceId())
234 .append(", Treatments: ")
235 .append(((NextObjective) pn.flowObjective())
236 .next());
237 }
238 pendingFlowObjectives.add(pend.toString());
239 }
240
241 return pendingFlowObjectives;
yoonseon86bebed2017-02-03 15:23:57 -0800242 }
243
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700244 private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
yoonseon86bebed2017-02-03 15:23:57 -0800245 boolean queued = false;
246 synchronized (pendingForwards) {
247 // double check the flow objective store, because this block could run
248 // after a notification arrives
249 if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
250 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700251 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
yoonseon86bebed2017-02-03 15:23:57 -0800252 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700253 return Sets.newHashSet(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800254 } else {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700255 pending.add(pendfo);
yoonseon86bebed2017-02-03 15:23:57 -0800256 return pending;
257 }
258 });
259 queued = true;
260 }
261 }
262 if (queued) {
263 log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
264 fwd.id(), fwd.nextId(), deviceId);
265 }
266 return queued;
267 }
268
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700269 private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
270
271 // we need to hold off on other operations till we get notified that the
272 // initial group creation has succeeded
273 boolean queued = false;
274 synchronized (pendingNexts) {
275 // double check the flow objective store, because this block could run
276 // after a notification arrives
277 if (flowObjectiveStore.getNextGroup(next.id()) == null) {
278 pendingNexts.compute(next.id(), (id, pending) -> {
279 PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
280 if (pending == null) {
281 return Sets.newHashSet(pendfo);
282 } else {
283 pending.add(pendfo);
284 return pending;
285 }
286 });
287 queued = true;
288 }
289 }
290 if (queued) {
291 log.debug("Queued next objective {} with operation {} meant for device {}",
292 next.id(), next.op(), deviceId);
293 }
294 return queued;
295 }
296
yoonseon86bebed2017-02-03 15:23:57 -0800297 /**
298 * Task that passes the flow objective down to the driver. The task will
299 * make a few attempts to find the appropriate driver, then eventually give
300 * up and report an error if no suitable driver could be found.
301 */
302 private class ObjectiveInstaller implements Runnable {
303 private final DeviceId deviceId;
304 private final Objective objective;
305
306 private final int numAttempts;
307
308 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
309 this(deviceId, objective, 1);
310 }
311
312 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
313 this.deviceId = checkNotNull(deviceId);
314 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800315 this.numAttempts = attemps;
yoonseon86bebed2017-02-03 15:23:57 -0800316 }
317
318 @Override
319 public void run() {
320 try {
321 Pipeliner pipeliner = getDevicePipeliner(deviceId);
322
323 if (pipeliner != null) {
324 if (objective instanceof NextObjective) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700325 nextToDevice.put(objective.id(), deviceId);
yoonseon86bebed2017-02-03 15:23:57 -0800326 pipeliner.next((NextObjective) objective);
327 } else if (objective instanceof ForwardingObjective) {
328 pipeliner.forward((ForwardingObjective) objective);
329 } else {
330 pipeliner.filter((FilteringObjective) objective);
331 }
332 //Attempts to check if pipeliner is null for retry attempts
333 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
334 Thread.sleep(INSTALL_RETRY_INTERVAL);
335 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
336 } else {
337 // Otherwise we've tried a few times and failed, report an
338 // error back to the user.
339 objective.context().ifPresent(
340 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
341 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700342 //Exception thrown
yoonseon86bebed2017-02-03 15:23:57 -0800343 } catch (Exception e) {
344 log.warn("Exception while installing flow objective", e);
345 }
346 }
347 }
348
349 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
350 @Override
351 public void notify(ObjectiveEvent event) {
352 if (event.type() == ObjectiveEvent.Type.ADD) {
353 log.debug("Received notification of obj event {}", event);
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700354 Set<PendingFlowObjective> pending;
355
356 // first send all pending flows
yoonseon86bebed2017-02-03 15:23:57 -0800357 synchronized (pendingForwards) {
358 // needs to be synchronized for queueObjective lookup
359 pending = pendingForwards.remove(event.subject());
360 }
yoonseon86bebed2017-02-03 15:23:57 -0800361 if (pending == null) {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700362 log.debug("No forwarding objectives pending for this "
363 + "obj event {}", event);
364 } else {
365 log.debug("Processing {} pending forwarding objectives for nextId {}",
366 pending.size(), event.subject());
367 pending.forEach(p -> getDevicePipeliner(p.deviceId())
368 .forward((ForwardingObjective) p.flowObjective()));
yoonseon86bebed2017-02-03 15:23:57 -0800369 }
370
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700371 // now check for pending next-objectives
372 synchronized (pendingNexts) {
373 // needs to be synchronized for queueObjective lookup
374 pending = pendingNexts.remove(event.subject());
375 }
376 if (pending == null) {
377 log.debug("No next objectives pending for this "
378 + "obj event {}", event);
379 } else {
380 log.debug("Processing {} pending next objectives for nextId {}",
381 pending.size(), event.subject());
382 pending.forEach(p -> getDevicePipeliner(p.deviceId())
383 .next((NextObjective) p.flowObjective()));
384 }
yoonseon86bebed2017-02-03 15:23:57 -0800385 }
386 }
387 }
388
389 /**
390 * Retrieves (if it exists) the device pipeline behaviour from the cache.
391 * Otherwise it warms the caches and triggers the init method of the Pipeline.
392 * For virtual network, it returns OVS pipeliner.
393 *
394 * @param deviceId the id of the device associated to the pipeline
395 * @return the implementation of the Pipeliner behaviour
396 */
397 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
398 return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
399 }
400
401 /**
402 * Creates and initialize {@link Pipeliner}.
403 * <p>
404 * Note: Expected to be called under per-Device lock.
405 * e.g., {@code pipeliners}' Map#compute family methods
406 *
407 * @param deviceId Device to initialize pipeliner
408 * @return {@link Pipeliner} instance or null
409 */
410 private Pipeliner initPipelineHandler(DeviceId deviceId) {
411 //FIXME: do we need a standard pipeline for virtual device?
412 Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
413 pipeliner.init(deviceId, context);
414 return pipeliner;
415 }
416
417 // Processing context for initializing pipeline driver behaviours.
418 private class InnerPipelineContext implements PipelinerContext {
Saurav Das1547b3f2017-05-05 17:01:08 -0700419 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800420 public ServiceDirectory directory() {
421 return serviceDirectory;
422 }
423
Saurav Das1547b3f2017-05-05 17:01:08 -0700424 @Override
yoonseon86bebed2017-02-03 15:23:57 -0800425 public FlowObjectiveStore store() {
426 return flowObjectiveStore;
427 }
428 }
429
430 /**
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700431 * Data class used to hold a pending flow objective that could not
yoonseon86bebed2017-02-03 15:23:57 -0800432 * be processed because the associated next object was not present.
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700433 * Note that this pending flow objective could be a forwarding objective
434 * waiting for a next objective to complete execution. Or it could a
435 * next objective (with a different operation - remove, addToExisting, or
436 * removeFromExisting) waiting for a next objective with the same id to
437 * complete execution.
yoonseon86bebed2017-02-03 15:23:57 -0800438 */
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700439 private class PendingFlowObjective {
yoonseon86bebed2017-02-03 15:23:57 -0800440 private final DeviceId deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700441 private final Objective flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800442
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700443 public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
yoonseon86bebed2017-02-03 15:23:57 -0800444 this.deviceId = deviceId;
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700445 this.flowObj = flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800446 }
447
448 public DeviceId deviceId() {
449 return deviceId;
450 }
451
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700452 public Objective flowObjective() {
453 return flowObj;
yoonseon86bebed2017-02-03 15:23:57 -0800454 }
455
456 @Override
457 public int hashCode() {
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700458 return Objects.hash(deviceId, flowObj);
yoonseon86bebed2017-02-03 15:23:57 -0800459 }
460
461 @Override
462 public boolean equals(final Object obj) {
463 if (this == obj) {
464 return true;
465 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700466 if (!(obj instanceof PendingFlowObjective)) {
yoonseon86bebed2017-02-03 15:23:57 -0800467 return false;
468 }
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700469 final PendingFlowObjective other = (PendingFlowObjective) obj;
yoonseon86bebed2017-02-03 15:23:57 -0800470 if (this.deviceId.equals(other.deviceId) &&
Yoonseon Hanab7a1f62017-05-18 15:31:09 -0700471 this.flowObj.equals(other.flowObj)) {
yoonseon86bebed2017-02-03 15:23:57 -0800472 return true;
473 }
474 return false;
475 }
476 }
477
478 /**
479 * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
480 * to FlowObjectiveStore for PipelinerContext.
481 */
482 private class StoreConvertor implements FlowObjectiveStore {
483
484 @Override
485 public void setDelegate(FlowObjectiveStoreDelegate delegate) {
486 virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
487 }
488
489 @Override
490 public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
491 virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
492 }
493
494 @Override
495 public boolean hasDelegate() {
496 return virtualFlowObjectiveStore.hasDelegate(networkId());
497 }
498
499 @Override
500 public void putNextGroup(Integer nextId, NextGroup group) {
501 virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
502 }
503
504 @Override
505 public NextGroup getNextGroup(Integer nextId) {
506 return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
507 }
508
509 @Override
510 public NextGroup removeNextGroup(Integer nextId) {
511 return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
512 }
513
514 @Override
515 public Map<Integer, NextGroup> getAllGroups() {
516 return virtualFlowObjectiveStore.getAllGroups(networkId());
517 }
518
519 @Override
520 public int allocateNextId() {
521 return virtualFlowObjectiveStore.allocateNextId(networkId());
522 }
523 }
524
525 /**
526 * Simple single table pipeline abstraction for virtual networks.
527 */
528 private class DefaultVirtualDevicePipeline
529 extends AbstractHandlerBehaviour implements Pipeliner {
530
531 private final Logger log = getLogger(getClass());
532
533 private DeviceId deviceId;
534
535 private Cache<Integer, NextObjective> pendingNext;
536
537 private KryoNamespace appKryo = new KryoNamespace.Builder()
538 .register(GroupKey.class)
539 .register(DefaultGroupKey.class)
540 .register(SingleGroup.class)
541 .register(byte[].class)
542 .build("DefaultVirtualDevicePipeline");
543
544 @Override
545 public void init(DeviceId deviceId, PipelinerContext context) {
546 this.deviceId = deviceId;
547
548 pendingNext = CacheBuilder.newBuilder()
549 .expireAfterWrite(20, TimeUnit.SECONDS)
550 .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
551 if (notification.getCause() == RemovalCause.EXPIRED) {
552 notification.getValue().context()
553 .ifPresent(c -> c.onError(notification.getValue(),
554 ObjectiveError.FLOWINSTALLATIONFAILED));
555 }
556 }).build();
557 }
558
559 @Override
560 public void filter(FilteringObjective filter) {
561
562 TrafficTreatment.Builder actions;
563 switch (filter.type()) {
564 case PERMIT:
565 actions = (filter.meta() == null) ?
566 DefaultTrafficTreatment.builder().punt() :
567 DefaultTrafficTreatment.builder(filter.meta());
568 break;
569 case DENY:
570 actions = (filter.meta() == null) ?
571 DefaultTrafficTreatment.builder() :
572 DefaultTrafficTreatment.builder(filter.meta());
573 actions.drop();
574 break;
575 default:
576 log.warn("Unknown filter type: {}", filter.type());
577 actions = DefaultTrafficTreatment.builder().drop();
578 }
579
580 TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
581
582 filter.conditions().forEach(selector::add);
583
584 if (filter.key() != null) {
585 selector.add(filter.key());
586 }
587
588 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
589 .forDevice(deviceId)
590 .withSelector(selector.build())
591 .withTreatment(actions.build())
592 .fromApp(filter.appId())
593 .withPriority(filter.priority());
594
595 if (filter.permanent()) {
596 ruleBuilder.makePermanent();
597 } else {
598 ruleBuilder.makeTemporary(filter.timeout());
599 }
600
601 installObjective(ruleBuilder, filter);
602 }
603
604 @Override
605 public void forward(ForwardingObjective fwd) {
606 TrafficSelector selector = fwd.selector();
607
608 if (fwd.treatment() != null) {
609 // Deal with SPECIFIC and VERSATILE in the same manner.
610 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
611 .forDevice(deviceId)
612 .withSelector(selector)
613 .fromApp(fwd.appId())
614 .withPriority(fwd.priority())
615 .withTreatment(fwd.treatment());
616
617 if (fwd.permanent()) {
618 ruleBuilder.makePermanent();
619 } else {
620 ruleBuilder.makeTemporary(fwd.timeout());
621 }
622 installObjective(ruleBuilder, fwd);
623
624 } else {
625 NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
626 if (nextObjective != null) {
627 pendingNext.invalidate(fwd.nextId());
628 nextObjective.next().forEach(treat -> {
629 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
630 .forDevice(deviceId)
631 .withSelector(selector)
632 .fromApp(fwd.appId())
633 .withPriority(fwd.priority())
634 .withTreatment(treat);
635
636 if (fwd.permanent()) {
637 ruleBuilder.makePermanent();
638 } else {
639 ruleBuilder.makeTemporary(fwd.timeout());
640 }
641 installObjective(ruleBuilder, fwd);
642 });
643 } else {
644 fwd.context().ifPresent(c -> c.onError(fwd,
645 ObjectiveError.GROUPMISSING));
646 }
647 }
648 }
649
650 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
651 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
652 switch (objective.op()) {
653
654 case ADD:
655 flowBuilder.add(ruleBuilder.build());
656 break;
657 case REMOVE:
658 flowBuilder.remove(ruleBuilder.build());
659 break;
660 default:
661 log.warn("Unknown operation {}", objective.op());
662 }
663
664 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
665 @Override
666 public void onSuccess(FlowRuleOperations ops) {
667 objective.context().ifPresent(context -> context.onSuccess(objective));
668 }
669
670 @Override
671 public void onError(FlowRuleOperations ops) {
672 objective.context()
673 .ifPresent(context ->
674 context.onError(objective,
675 ObjectiveError.FLOWINSTALLATIONFAILED));
676 }
677 }));
678 }
679
680 @Override
681 public void next(NextObjective nextObjective) {
682
683 pendingNext.put(nextObjective.id(), nextObjective);
684 flowObjectiveStore.putNextGroup(nextObjective.id(),
685 new SingleGroup(
686 new DefaultGroupKey(
687 appKryo.serialize(nextObjective.id()))));
688 nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
689 }
690
691 @Override
692 public List<String> getNextMappings(NextGroup nextGroup) {
693 // Default single table pipeline does not use nextObjectives or groups
694 return null;
695 }
696
697 private class SingleGroup implements NextGroup {
698
699 private final GroupKey key;
700
701 public SingleGroup(GroupKey key) {
702 this.key = key;
703 }
704
705 public GroupKey key() {
706 return key;
707 }
708
709 @Override
710 public byte[] data() {
711 return appKryo.serialize(key);
712 }
713 }
714 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700715
yoonseon86bebed2017-02-03 15:23:57 -0800716}