blob: 823891ffc5c6edee3e1e98626ec76d5dc7cf2fec [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
19import com.google.common.collect.ArrayListMultimap;
20import com.google.common.collect.ListMultimap;
21import com.google.common.collect.Multimaps;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Service;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
29import org.onosproject.net.flow.TrafficSelector;
30import org.onosproject.net.flow.criteria.Criterion;
31import org.onosproject.net.flowobjective.FilteringObjective;
32import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
33import org.onosproject.net.flowobjective.ForwardingObjective;
34import org.onosproject.net.flowobjective.NextObjective;
35import org.onosproject.net.flowobjective.Objective;
36import org.onosproject.net.flowobjective.ObjectiveContext;
37import org.onosproject.net.flowobjective.ObjectiveError;
38import org.onosproject.net.flowobjective.ObjectiveEvent;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import java.util.List;
43import java.util.Objects;
44import java.util.Optional;
45import java.util.Set;
46
47@Component(immediate = true, enabled = true)
48@Service
49public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
50 private final Logger log = LoggerFactory.getLogger(getClass());
51
52 // TODO Making these cache and timeout the entries
53 private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
54 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
55 private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
56 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
57 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
58 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
59
60 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
61
62 @Activate
63 protected void activate() {
64 super.activate();
65 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
66 // process()
67 flowObjectiveStore.unsetDelegate(super.delegate);
68 flowObjectiveStore.setDelegate(delegate);
69 }
70
71 @Deactivate
72 protected void deactivate() {
73 super.deactivate();
74 }
75
76 /**
77 * Processes given objective on given device.
78 * Objectives submitted through this method are guaranteed to be executed in order.
79 *
80 * @param deviceId Device ID
81 * @param originalObjective Flow objective to be executed
82 */
83 private void process(DeviceId deviceId, Objective originalObjective) {
84 // Inject ObjectiveContext such that we can get notified when it is completed
85 Objective.Builder objBuilder = originalObjective.copy();
86 Optional<ObjectiveContext> originalContext = originalObjective.context();
87 ObjectiveContext context = new ObjectiveContext() {
88 @Override
89 public void onSuccess(Objective objective) {
90 log.trace("Flow objective onSuccess {}", objective);
91 dequeue(deviceId, objective);
92 originalContext.ifPresent(c -> c.onSuccess(objective));
93 }
94 @Override
95 public void onError(Objective objective, ObjectiveError error) {
96 log.trace("Flow objective onError {}", objective);
97 dequeue(deviceId, objective);
98 originalContext.ifPresent(c -> c.onError(objective, error));
99 }
100 };
101
102 // Preserve Objective.Operation
103 Objective objective;
104 switch (originalObjective.op()) {
105 case ADD:
106 objective = objBuilder.add(context);
107 break;
108 case ADD_TO_EXISTING:
109 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
110 break;
111 case REMOVE:
112 objective = objBuilder.remove(context);
113 break;
114 case REMOVE_FROM_EXISTING:
115 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
116 break;
117 case MODIFY:
118 objective = ((NextObjective.Builder) objBuilder).modify(context);
119 break;
120 case VERIFY:
121 objective = ((NextObjective.Builder) objBuilder).verify(context);
122 break;
123 default:
124 log.error("Unknown flow objecitve operation {}", originalObjective.op());
125 return;
126 }
127
128 enqueue(deviceId, objective);
129 }
130
131 @Override
132 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
133 process(deviceId, filteringObjective);
134 }
135
136 @Override
137 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
138 process(deviceId, forwardingObjective);
139 }
140
141 @Override
142 public void next(DeviceId deviceId, NextObjective nextObjective) {
143 process(deviceId, nextObjective);
144 }
145
146 /**
147 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
148 *
149 * @param deviceId Device ID
150 * @param obj Flow objective
151 */
152 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
153 int queueSize;
154 int priority = obj.priority();
155
Charles Chanc09ad6d2018-04-04 16:31:23 -0700156 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
157 Tools.log(log, logLevel, "Enqueue {}", obj);
158
Charles Chana7903c82018-03-15 20:14:16 -0700159 if (obj instanceof FilteringObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700160 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
161 filtObjQueue.put(k, obj);
162 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700163 } else if (obj instanceof ForwardingObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700164 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
165 fwdObjQueue.put(k, obj);
166 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700167 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700168 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
169 nextObjQueue.put(k, obj);
170 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700171 } else {
172 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
173 return;
174 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700175 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700176
177 // Execute immediately if there is no pending obj ahead
178 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700179 execute(deviceId, obj);
180 }
181 }
182
183 /**
184 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
185 *
186 * @param deviceId Device ID
187 * @param obj Flow objective
188 */
189 private synchronized void dequeue(DeviceId deviceId, Objective obj) {
190 List<Objective> remaining;
191 int priority = obj.priority();
192
Charles Chanc09ad6d2018-04-04 16:31:23 -0700193 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
194 Tools.log(log, logLevel, "Dequeue {}", obj);
195
Charles Chana7903c82018-03-15 20:14:16 -0700196 if (obj instanceof FilteringObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700197 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
198 filtObjQueue.remove(k, obj);
199 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700200 } else if (obj instanceof ForwardingObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700201 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
202 fwdObjQueue.remove(k, obj);
203 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700204 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700205 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
206 nextObjQueue.remove(k, obj);
207 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700208 } else {
209 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
210 return;
211 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700212 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700213
214 // Submit the next one in the queue, if any
215 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700216 execute(deviceId, remaining.get(0));
217 }
218 }
219
220 /**
221 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
222 * Therefore we must be certain that this method is called in-order.
223 *
224 * @param deviceId Device ID
225 * @param obj Flow objective
226 */
227 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700228 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
229 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
230
Charles Chana7903c82018-03-15 20:14:16 -0700231 if (obj instanceof FilteringObjective) {
232 super.filter(deviceId, (FilteringObjective) obj);
233 } else if (obj instanceof ForwardingObjective) {
234 super.forward(deviceId, (ForwardingObjective) obj);
235 } else if (obj instanceof NextObjective) {
236 super.next(deviceId, (NextObjective) obj);
237 } else {
238 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
239 }
240 }
241
242 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
243 @Override
244 public void notify(ObjectiveEvent event) {
245 if (event.type() == ObjectiveEvent.Type.ADD) {
246 log.debug("Received notification of obj event {}", event);
247 Set<PendingFlowObjective> pending;
248
249 // first send all pending flows
250 synchronized (pendingForwards) {
251 // needs to be synchronized for queueObjective lookup
252 pending = pendingForwards.remove(event.subject());
253 }
254 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700255 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700256 } else {
257 log.debug("Processing {} pending forwarding objectives for nextId {}",
258 pending.size(), event.subject());
259 // resubmitted back to the execution queue
260 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
261 }
262
263 // now check for pending next-objectives
264 // Note: This is still necessary despite the existence of in-order execution.
265 // Since the in-order execution does not handle the case of
266 // ADD_TO_EXISTING coming before ADD
267 List<PendingFlowObjective> pendNexts;
268 synchronized (pendingNexts) {
269 // needs to be synchronized for queueObjective lookup
270 pendNexts = pendingNexts.remove(event.subject());
271 }
272 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700273 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700274 } else {
275 log.debug("Processing {} pending next objectives for nextId {}",
276 pendNexts.size(), event.subject());
277 // resubmitted back to the execution queue
278 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
279 }
280 }
281 }
282 }
283
284 private static class FiltObjQueueKey {
285 private DeviceId deviceId;
286 private int priority;
287 private Criterion key;
288
289 FiltObjQueueKey(DeviceId deviceId, int priority, Criterion key) {
290 this.deviceId = deviceId;
291 this.priority = priority;
292 this.key = key;
293 }
294
295 @Override
296 public int hashCode() {
297 return Objects.hash(deviceId, priority, key);
298 }
299
300 @Override
301 public boolean equals(Object other) {
302 if (this == other) {
303 return true;
304 }
305 if (!(other instanceof FiltObjQueueKey)) {
306 return false;
307 }
308 FiltObjQueueKey that = (FiltObjQueueKey) other;
309 return Objects.equals(this.deviceId, that.deviceId) &&
310 Objects.equals(this.priority, that.priority) &&
311 Objects.equals(this.key, that.key);
312 }
313 }
314
315 private static class FwdObjQueueKey {
316 private DeviceId deviceId;
317 private int priority;
318 private TrafficSelector selector;
319
320 FwdObjQueueKey(DeviceId deviceId, int priority, TrafficSelector selector) {
321 this.deviceId = deviceId;
322 this.priority = priority;
323 this.selector = selector;
324 }
325
326 @Override
327 public int hashCode() {
328 return Objects.hash(deviceId, priority, selector);
329 }
330
331 @Override
332 public boolean equals(Object other) {
333 if (this == other) {
334 return true;
335 }
336 if (!(other instanceof FwdObjQueueKey)) {
337 return false;
338 }
339 FwdObjQueueKey that = (FwdObjQueueKey) other;
340 return Objects.equals(this.deviceId, that.deviceId) &&
341 Objects.equals(this.priority, that.priority) &&
342 Objects.equals(this.selector, that.selector);
343 }
344 }
345
346 private static class NextObjQueueKey {
347 private DeviceId deviceId;
348 private int id;
349
350 NextObjQueueKey(DeviceId deviceId, int id) {
351 this.deviceId = deviceId;
352 this.id = id;
353 }
354
355 @Override
356 public int hashCode() {
357 return Objects.hash(deviceId, id);
358 }
359
360 @Override
361 public boolean equals(Object other) {
362 if (this == other) {
363 return true;
364 }
365 if (!(other instanceof NextObjQueueKey)) {
366 return false;
367 }
368 NextObjQueueKey that = (NextObjQueueKey) other;
369 return Objects.equals(this.deviceId, that.deviceId) &&
370 Objects.equals(this.id, that.id);
371 }
372 }
373}