blob: f32bb6cf50ca1e910505fb7302bbde06bc201d6d [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalNotification;
Charles Chana7903c82018-03-15 20:14:16 -070022import com.google.common.collect.ArrayListMultimap;
23import com.google.common.collect.ListMultimap;
24import com.google.common.collect.Multimaps;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Service;
Charles Chanc09ad6d2018-04-04 16:31:23 -070029import org.onlab.util.Tools;
30import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070031import org.onosproject.net.DeviceId;
32import org.onosproject.net.flow.TrafficSelector;
33import org.onosproject.net.flow.criteria.Criterion;
34import org.onosproject.net.flowobjective.FilteringObjective;
35import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
36import org.onosproject.net.flowobjective.ForwardingObjective;
37import org.onosproject.net.flowobjective.NextObjective;
38import org.onosproject.net.flowobjective.Objective;
39import org.onosproject.net.flowobjective.ObjectiveContext;
40import org.onosproject.net.flowobjective.ObjectiveError;
41import org.onosproject.net.flowobjective.ObjectiveEvent;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import java.util.List;
46import java.util.Objects;
47import java.util.Optional;
48import java.util.Set;
Charles Chan45c19d72018-04-19 21:38:40 -070049import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.TimeUnit;
51
52import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
53import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070054
55@Component(immediate = true, enabled = true)
56@Service
57public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
Charles Chan45c19d72018-04-19 21:38:40 -070060 // TODO Make queue timeout configurable
61 static final int OBJ_TIMEOUT_MS = 5000;
62
63 private Cache<FiltObjQueueKey, Objective> filtObjQueueHead;
64 private Cache<FwdObjQueueKey, Objective> fwdObjQueueHead;
65 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
66 private ScheduledExecutorService cacheCleaner;
67
Charles Chana7903c82018-03-15 20:14:16 -070068 private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
69 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
70 private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
71 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
72 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
73 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
74
75 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
76
77 @Activate
78 protected void activate() {
79 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070080
81 // TODO Clean up duplicated code
82 filtObjQueueHead = CacheBuilder.newBuilder()
83 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
84 .removalListener((RemovalNotification<FiltObjQueueKey, Objective> notification) -> {
85 Objective obj = notification.getValue();
86 switch (notification.getCause()) {
87 case EXPIRED:
88 case COLLECTED:
89 case SIZE:
90 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
91 break;
92 case EXPLICIT: // No action when the objective completes correctly
93 case REPLACED: // No action when a pending forward or next objective gets executed
94 default:
95 break;
96 }
97 }).build();
98 fwdObjQueueHead = CacheBuilder.newBuilder()
99 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
100 .removalListener((RemovalNotification<FwdObjQueueKey, Objective> notification) -> {
101 Objective obj = notification.getValue();
102 switch (notification.getCause()) {
103 case EXPIRED:
104 case COLLECTED:
105 case SIZE:
106 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
107 break;
108 case EXPLICIT: // No action when the objective completes correctly
109 case REPLACED: // No action when a pending forward or next objective gets executed
110 default:
111 break;
112 }
113 }).build();
114 nextObjQueueHead = CacheBuilder.newBuilder()
115 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
116 .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
117 Objective obj = notification.getValue();
118 switch (notification.getCause()) {
119 case EXPIRED:
120 case COLLECTED:
121 case SIZE:
122 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
123 break;
124 case EXPLICIT: // No action when the objective completes correctly
125 case REPLACED: // No action when a pending forward or next objective gets executed
126 default:
127 break;
128 }
129 }).build();
130
131 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
132 cacheCleaner.scheduleAtFixedRate(() -> {
133 filtObjQueueHead.cleanUp();
134 fwdObjQueueHead.cleanUp();
135 nextObjQueueHead.cleanUp();
136 }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
137
Charles Chana7903c82018-03-15 20:14:16 -0700138 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700139 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700140 flowObjectiveStore.unsetDelegate(super.delegate);
141 flowObjectiveStore.setDelegate(delegate);
142 }
143
144 @Deactivate
145 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700146 cacheCleaner.shutdown();
147 filtObjQueueHead.invalidateAll();
148 fwdObjQueueHead.invalidateAll();
149 nextObjQueueHead.invalidateAll();
150
Charles Chana7903c82018-03-15 20:14:16 -0700151 super.deactivate();
152 }
153
154 /**
155 * Processes given objective on given device.
156 * Objectives submitted through this method are guaranteed to be executed in order.
157 *
158 * @param deviceId Device ID
159 * @param originalObjective Flow objective to be executed
160 */
161 private void process(DeviceId deviceId, Objective originalObjective) {
162 // Inject ObjectiveContext such that we can get notified when it is completed
163 Objective.Builder objBuilder = originalObjective.copy();
164 Optional<ObjectiveContext> originalContext = originalObjective.context();
165 ObjectiveContext context = new ObjectiveContext() {
166 @Override
167 public void onSuccess(Objective objective) {
168 log.trace("Flow objective onSuccess {}", objective);
169 dequeue(deviceId, objective);
170 originalContext.ifPresent(c -> c.onSuccess(objective));
171 }
172 @Override
173 public void onError(Objective objective, ObjectiveError error) {
Charles Chan45c19d72018-04-19 21:38:40 -0700174 log.warn("Flow objective onError {}", objective);
Charles Chana7903c82018-03-15 20:14:16 -0700175 dequeue(deviceId, objective);
176 originalContext.ifPresent(c -> c.onError(objective, error));
177 }
178 };
179
180 // Preserve Objective.Operation
181 Objective objective;
182 switch (originalObjective.op()) {
183 case ADD:
184 objective = objBuilder.add(context);
185 break;
186 case ADD_TO_EXISTING:
187 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
188 break;
189 case REMOVE:
190 objective = objBuilder.remove(context);
191 break;
192 case REMOVE_FROM_EXISTING:
193 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
194 break;
195 case MODIFY:
196 objective = ((NextObjective.Builder) objBuilder).modify(context);
197 break;
198 case VERIFY:
199 objective = ((NextObjective.Builder) objBuilder).verify(context);
200 break;
201 default:
202 log.error("Unknown flow objecitve operation {}", originalObjective.op());
203 return;
204 }
205
206 enqueue(deviceId, objective);
207 }
208
209 @Override
210 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
211 process(deviceId, filteringObjective);
212 }
213
214 @Override
215 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
216 process(deviceId, forwardingObjective);
217 }
218
219 @Override
220 public void next(DeviceId deviceId, NextObjective nextObjective) {
221 process(deviceId, nextObjective);
222 }
223
224 /**
225 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
226 *
227 * @param deviceId Device ID
228 * @param obj Flow objective
229 */
230 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
231 int queueSize;
232 int priority = obj.priority();
233
Charles Chanc09ad6d2018-04-04 16:31:23 -0700234 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
235 Tools.log(log, logLevel, "Enqueue {}", obj);
236
Charles Chana7903c82018-03-15 20:14:16 -0700237 if (obj instanceof FilteringObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700238 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
239 filtObjQueue.put(k, obj);
240 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700241 } else if (obj instanceof ForwardingObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700242 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
243 fwdObjQueue.put(k, obj);
244 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700245 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700246 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
247 nextObjQueue.put(k, obj);
248 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700249 } else {
250 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
251 return;
252 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700253 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700254
255 // Execute immediately if there is no pending obj ahead
256 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700257 execute(deviceId, obj);
258 }
259 }
260
261 /**
262 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
263 *
264 * @param deviceId Device ID
265 * @param obj Flow objective
266 */
267 private synchronized void dequeue(DeviceId deviceId, Objective obj) {
268 List<Objective> remaining;
269 int priority = obj.priority();
270
Charles Chanc09ad6d2018-04-04 16:31:23 -0700271 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
272 Tools.log(log, logLevel, "Dequeue {}", obj);
273
Charles Chana7903c82018-03-15 20:14:16 -0700274 if (obj instanceof FilteringObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700275 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700276 filtObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700277 filtObjQueue.remove(k, obj);
278 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700279 } else if (obj instanceof ForwardingObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700280 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700281 fwdObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700282 fwdObjQueue.remove(k, obj);
283 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700284 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700285 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chan45c19d72018-04-19 21:38:40 -0700286 nextObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700287 nextObjQueue.remove(k, obj);
288 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700289 } else {
290 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
291 return;
292 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700293 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700294
295 // Submit the next one in the queue, if any
296 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700297 execute(deviceId, remaining.get(0));
298 }
299 }
300
301 /**
302 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
303 * Therefore we must be certain that this method is called in-order.
304 *
305 * @param deviceId Device ID
306 * @param obj Flow objective
307 */
308 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700309 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
310 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
311
Charles Chan45c19d72018-04-19 21:38:40 -0700312 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700313 if (obj instanceof FilteringObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700314 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
315 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700316 super.filter(deviceId, (FilteringObjective) obj);
317 } else if (obj instanceof ForwardingObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700318 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
319 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700320 super.forward(deviceId, (ForwardingObjective) obj);
321 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700322 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
323 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700324 super.next(deviceId, (NextObjective) obj);
325 } else {
326 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
327 }
328 }
329
330 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
331 @Override
332 public void notify(ObjectiveEvent event) {
333 if (event.type() == ObjectiveEvent.Type.ADD) {
334 log.debug("Received notification of obj event {}", event);
335 Set<PendingFlowObjective> pending;
336
337 // first send all pending flows
338 synchronized (pendingForwards) {
339 // needs to be synchronized for queueObjective lookup
340 pending = pendingForwards.remove(event.subject());
341 }
342 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700343 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700344 } else {
345 log.debug("Processing {} pending forwarding objectives for nextId {}",
346 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700347 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700348 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
349 }
350
351 // now check for pending next-objectives
352 // Note: This is still necessary despite the existence of in-order execution.
353 // Since the in-order execution does not handle the case of
354 // ADD_TO_EXISTING coming before ADD
355 List<PendingFlowObjective> pendNexts;
356 synchronized (pendingNexts) {
357 // needs to be synchronized for queueObjective lookup
358 pendNexts = pendingNexts.remove(event.subject());
359 }
360 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700361 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700362 } else {
363 log.debug("Processing {} pending next objectives for nextId {}",
364 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700365 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700366 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
367 }
368 }
369 }
370 }
371
372 private static class FiltObjQueueKey {
373 private DeviceId deviceId;
374 private int priority;
375 private Criterion key;
376
377 FiltObjQueueKey(DeviceId deviceId, int priority, Criterion key) {
378 this.deviceId = deviceId;
379 this.priority = priority;
380 this.key = key;
381 }
382
383 @Override
384 public int hashCode() {
385 return Objects.hash(deviceId, priority, key);
386 }
387
388 @Override
389 public boolean equals(Object other) {
390 if (this == other) {
391 return true;
392 }
393 if (!(other instanceof FiltObjQueueKey)) {
394 return false;
395 }
396 FiltObjQueueKey that = (FiltObjQueueKey) other;
397 return Objects.equals(this.deviceId, that.deviceId) &&
398 Objects.equals(this.priority, that.priority) &&
399 Objects.equals(this.key, that.key);
400 }
401 }
402
403 private static class FwdObjQueueKey {
404 private DeviceId deviceId;
405 private int priority;
406 private TrafficSelector selector;
407
408 FwdObjQueueKey(DeviceId deviceId, int priority, TrafficSelector selector) {
409 this.deviceId = deviceId;
410 this.priority = priority;
411 this.selector = selector;
412 }
413
414 @Override
415 public int hashCode() {
416 return Objects.hash(deviceId, priority, selector);
417 }
418
419 @Override
420 public boolean equals(Object other) {
421 if (this == other) {
422 return true;
423 }
424 if (!(other instanceof FwdObjQueueKey)) {
425 return false;
426 }
427 FwdObjQueueKey that = (FwdObjQueueKey) other;
428 return Objects.equals(this.deviceId, that.deviceId) &&
429 Objects.equals(this.priority, that.priority) &&
430 Objects.equals(this.selector, that.selector);
431 }
432 }
433
434 private static class NextObjQueueKey {
435 private DeviceId deviceId;
436 private int id;
437
438 NextObjQueueKey(DeviceId deviceId, int id) {
439 this.deviceId = deviceId;
440 this.id = id;
441 }
442
443 @Override
444 public int hashCode() {
445 return Objects.hash(deviceId, id);
446 }
447
448 @Override
449 public boolean equals(Object other) {
450 if (this == other) {
451 return true;
452 }
453 if (!(other instanceof NextObjQueueKey)) {
454 return false;
455 }
456 NextObjQueueKey that = (NextObjQueueKey) other;
457 return Objects.equals(this.deviceId, that.deviceId) &&
458 Objects.equals(this.id, that.id);
459 }
460 }
461}