blob: f6e6dd8385bf153acf8ebe187b80c4bdc4a38c08 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
19import com.google.common.collect.ArrayListMultimap;
20import com.google.common.collect.ListMultimap;
21import com.google.common.collect.Multimaps;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Service;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.flow.TrafficSelector;
28import org.onosproject.net.flow.criteria.Criterion;
29import org.onosproject.net.flowobjective.FilteringObjective;
30import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
31import org.onosproject.net.flowobjective.ForwardingObjective;
32import org.onosproject.net.flowobjective.NextObjective;
33import org.onosproject.net.flowobjective.Objective;
34import org.onosproject.net.flowobjective.ObjectiveContext;
35import org.onosproject.net.flowobjective.ObjectiveError;
36import org.onosproject.net.flowobjective.ObjectiveEvent;
37import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
40import java.util.List;
41import java.util.Objects;
42import java.util.Optional;
43import java.util.Set;
44
45@Component(immediate = true, enabled = true)
46@Service
47public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
48 private final Logger log = LoggerFactory.getLogger(getClass());
49
50 // TODO Making these cache and timeout the entries
51 private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
52 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
53 private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
54 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
55 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
56 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
57
58 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
59
60 @Activate
61 protected void activate() {
62 super.activate();
63 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
64 // process()
65 flowObjectiveStore.unsetDelegate(super.delegate);
66 flowObjectiveStore.setDelegate(delegate);
67 }
68
69 @Deactivate
70 protected void deactivate() {
71 super.deactivate();
72 }
73
74 /**
75 * Processes given objective on given device.
76 * Objectives submitted through this method are guaranteed to be executed in order.
77 *
78 * @param deviceId Device ID
79 * @param originalObjective Flow objective to be executed
80 */
81 private void process(DeviceId deviceId, Objective originalObjective) {
82 // Inject ObjectiveContext such that we can get notified when it is completed
83 Objective.Builder objBuilder = originalObjective.copy();
84 Optional<ObjectiveContext> originalContext = originalObjective.context();
85 ObjectiveContext context = new ObjectiveContext() {
86 @Override
87 public void onSuccess(Objective objective) {
88 log.trace("Flow objective onSuccess {}", objective);
89 dequeue(deviceId, objective);
90 originalContext.ifPresent(c -> c.onSuccess(objective));
91 }
92 @Override
93 public void onError(Objective objective, ObjectiveError error) {
94 log.trace("Flow objective onError {}", objective);
95 dequeue(deviceId, objective);
96 originalContext.ifPresent(c -> c.onError(objective, error));
97 }
98 };
99
100 // Preserve Objective.Operation
101 Objective objective;
102 switch (originalObjective.op()) {
103 case ADD:
104 objective = objBuilder.add(context);
105 break;
106 case ADD_TO_EXISTING:
107 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
108 break;
109 case REMOVE:
110 objective = objBuilder.remove(context);
111 break;
112 case REMOVE_FROM_EXISTING:
113 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
114 break;
115 case MODIFY:
116 objective = ((NextObjective.Builder) objBuilder).modify(context);
117 break;
118 case VERIFY:
119 objective = ((NextObjective.Builder) objBuilder).verify(context);
120 break;
121 default:
122 log.error("Unknown flow objecitve operation {}", originalObjective.op());
123 return;
124 }
125
126 enqueue(deviceId, objective);
127 }
128
129 @Override
130 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
131 process(deviceId, filteringObjective);
132 }
133
134 @Override
135 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
136 process(deviceId, forwardingObjective);
137 }
138
139 @Override
140 public void next(DeviceId deviceId, NextObjective nextObjective) {
141 process(deviceId, nextObjective);
142 }
143
144 /**
145 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
146 *
147 * @param deviceId Device ID
148 * @param obj Flow objective
149 */
150 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
151 int queueSize;
152 int priority = obj.priority();
153
154 if (obj instanceof FilteringObjective) {
155 log.debug("Enqueue filtering objective {}", obj);
156 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
157 filtObjQueue.put(k, obj);
158 queueSize = filtObjQueue.get(k).size();
159 log.debug("Filtering objective queue size {}", queueSize);
160 } else if (obj instanceof ForwardingObjective) {
161 log.debug("Enqueue forwarding objective {}", obj);
162 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
163 fwdObjQueue.put(k, obj);
164 queueSize = fwdObjQueue.get(k).size();
165 log.debug("Forwarding objective queue size {}", queueSize);
166 } else if (obj instanceof NextObjective) {
167 log.debug("Enqueue next objective {}", obj);
168 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
169 nextObjQueue.put(k, obj);
170 queueSize = nextObjQueue.get(k).size();
171 log.debug("Next objective queue size {}", queueSize);
172 } else {
173 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
174 return;
175 }
176
177 // Execute immediately if there is no pending obj ahead
178 if (queueSize == 1) {
179 log.debug("First one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
180 execute(deviceId, obj);
181 }
182 }
183
184 /**
185 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
186 *
187 * @param deviceId Device ID
188 * @param obj Flow objective
189 */
190 private synchronized void dequeue(DeviceId deviceId, Objective obj) {
191 List<Objective> remaining;
192 int priority = obj.priority();
193
194 if (obj instanceof FilteringObjective) {
195 log.debug("Dequeue filtering objective {}", obj);
196 FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
197 filtObjQueue.remove(k, obj);
198 remaining = filtObjQueue.get(k);
199 log.debug("Filtering objective queue size {}", remaining.size());
200 } else if (obj instanceof ForwardingObjective) {
201 log.debug("Dequeue forwarding objective {}", obj);
202 FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
203 fwdObjQueue.remove(k, obj);
204 remaining = fwdObjQueue.get(k);
205 log.debug("Forwarding objective queue size {}", remaining.size());
206 } else if (obj instanceof NextObjective) {
207 log.debug("Dequeue next objective {}", obj);
208 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
209 nextObjQueue.remove(k, obj);
210 remaining = nextObjQueue.get(k);
211 log.debug("Next objective queue size {}", remaining.size());
212 } else {
213 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
214 return;
215 }
216
217 // Submit the next one in the queue, if any
218 if (remaining.size() > 0) {
219 log.debug("Next one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
220 execute(deviceId, remaining.get(0));
221 }
222 }
223
224 /**
225 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
226 * Therefore we must be certain that this method is called in-order.
227 *
228 * @param deviceId Device ID
229 * @param obj Flow objective
230 */
231 private void execute(DeviceId deviceId, Objective obj) {
232 if (obj instanceof FilteringObjective) {
233 super.filter(deviceId, (FilteringObjective) obj);
234 } else if (obj instanceof ForwardingObjective) {
235 super.forward(deviceId, (ForwardingObjective) obj);
236 } else if (obj instanceof NextObjective) {
237 super.next(deviceId, (NextObjective) obj);
238 } else {
239 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
240 }
241 }
242
243 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
244 @Override
245 public void notify(ObjectiveEvent event) {
246 if (event.type() == ObjectiveEvent.Type.ADD) {
247 log.debug("Received notification of obj event {}", event);
248 Set<PendingFlowObjective> pending;
249
250 // first send all pending flows
251 synchronized (pendingForwards) {
252 // needs to be synchronized for queueObjective lookup
253 pending = pendingForwards.remove(event.subject());
254 }
255 if (pending == null) {
256 log.debug("No forwarding objectives pending for this "
257 + "obj event {}", event);
258 } else {
259 log.debug("Processing {} pending forwarding objectives for nextId {}",
260 pending.size(), event.subject());
261 // resubmitted back to the execution queue
262 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
263 }
264
265 // now check for pending next-objectives
266 // Note: This is still necessary despite the existence of in-order execution.
267 // Since the in-order execution does not handle the case of
268 // ADD_TO_EXISTING coming before ADD
269 List<PendingFlowObjective> pendNexts;
270 synchronized (pendingNexts) {
271 // needs to be synchronized for queueObjective lookup
272 pendNexts = pendingNexts.remove(event.subject());
273 }
274 if (pendNexts == null) {
275 log.debug("No next objectives pending for this "
276 + "obj event {}", event);
277 } else {
278 log.debug("Processing {} pending next objectives for nextId {}",
279 pendNexts.size(), event.subject());
280 // resubmitted back to the execution queue
281 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
282 }
283 }
284 }
285 }
286
287 private static class FiltObjQueueKey {
288 private DeviceId deviceId;
289 private int priority;
290 private Criterion key;
291
292 FiltObjQueueKey(DeviceId deviceId, int priority, Criterion key) {
293 this.deviceId = deviceId;
294 this.priority = priority;
295 this.key = key;
296 }
297
298 @Override
299 public int hashCode() {
300 return Objects.hash(deviceId, priority, key);
301 }
302
303 @Override
304 public boolean equals(Object other) {
305 if (this == other) {
306 return true;
307 }
308 if (!(other instanceof FiltObjQueueKey)) {
309 return false;
310 }
311 FiltObjQueueKey that = (FiltObjQueueKey) other;
312 return Objects.equals(this.deviceId, that.deviceId) &&
313 Objects.equals(this.priority, that.priority) &&
314 Objects.equals(this.key, that.key);
315 }
316 }
317
318 private static class FwdObjQueueKey {
319 private DeviceId deviceId;
320 private int priority;
321 private TrafficSelector selector;
322
323 FwdObjQueueKey(DeviceId deviceId, int priority, TrafficSelector selector) {
324 this.deviceId = deviceId;
325 this.priority = priority;
326 this.selector = selector;
327 }
328
329 @Override
330 public int hashCode() {
331 return Objects.hash(deviceId, priority, selector);
332 }
333
334 @Override
335 public boolean equals(Object other) {
336 if (this == other) {
337 return true;
338 }
339 if (!(other instanceof FwdObjQueueKey)) {
340 return false;
341 }
342 FwdObjQueueKey that = (FwdObjQueueKey) other;
343 return Objects.equals(this.deviceId, that.deviceId) &&
344 Objects.equals(this.priority, that.priority) &&
345 Objects.equals(this.selector, that.selector);
346 }
347 }
348
349 private static class NextObjQueueKey {
350 private DeviceId deviceId;
351 private int id;
352
353 NextObjQueueKey(DeviceId deviceId, int id) {
354 this.deviceId = deviceId;
355 this.id = id;
356 }
357
358 @Override
359 public int hashCode() {
360 return Objects.hash(deviceId, id);
361 }
362
363 @Override
364 public boolean equals(Object other) {
365 if (this == other) {
366 return true;
367 }
368 if (!(other instanceof NextObjQueueKey)) {
369 return false;
370 }
371 NextObjQueueKey that = (NextObjQueueKey) other;
372 return Objects.equals(this.deviceId, that.deviceId) &&
373 Objects.equals(this.id, that.id);
374 }
375 }
376}