blob: a90f84cb61954813ef8fbabbcf555b92924513d0 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalNotification;
Charles Chana7903c82018-03-15 20:14:16 -070022import com.google.common.collect.ArrayListMultimap;
23import com.google.common.collect.ListMultimap;
24import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070025import org.onlab.util.Tools;
26import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070027import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070028import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070029import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070030import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070031import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070032import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070033import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070034import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070035import org.onosproject.net.flowobjective.NextObjective;
36import org.onosproject.net.flowobjective.Objective;
37import org.onosproject.net.flowobjective.ObjectiveContext;
38import org.onosproject.net.flowobjective.ObjectiveError;
39import org.onosproject.net.flowobjective.ObjectiveEvent;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070040import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
Charles Chana7903c82018-03-15 20:14:16 -070043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070047import java.util.Map;
Charles Chana7903c82018-03-15 20:14:16 -070048import java.util.Optional;
49import java.util.Set;
Charles Chan45c19d72018-04-19 21:38:40 -070050import java.util.concurrent.ScheduledExecutorService;
51import java.util.concurrent.TimeUnit;
52
53import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
54import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070055
Ray Milkeyd84f89b2018-08-17 14:54:17 -070056@Component(immediate = true, service = FlowObjectiveService.class)
Charles Chana7903c82018-03-15 20:14:16 -070057public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
Charles Chan45c19d72018-04-19 21:38:40 -070060 // TODO Make queue timeout configurable
Charles Chan35b589c2018-04-26 16:00:23 -040061 static final int OBJ_TIMEOUT_MS = 15000;
Charles Chan45c19d72018-04-19 21:38:40 -070062
Charles Chan33f4a912018-04-19 23:35:30 -070063 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
64 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070065 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
66 private ScheduledExecutorService cacheCleaner;
67
Charles Chan33f4a912018-04-19 23:35:30 -070068 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070069 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070070 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070071 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
72 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
73 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
74
75 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
76
77 @Activate
78 protected void activate() {
79 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070080
81 // TODO Clean up duplicated code
82 filtObjQueueHead = CacheBuilder.newBuilder()
83 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
Charles Chan33f4a912018-04-19 23:35:30 -070084 .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
Charles Chan45c19d72018-04-19 21:38:40 -070085 Objective obj = notification.getValue();
86 switch (notification.getCause()) {
87 case EXPIRED:
88 case COLLECTED:
89 case SIZE:
90 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
91 break;
92 case EXPLICIT: // No action when the objective completes correctly
93 case REPLACED: // No action when a pending forward or next objective gets executed
94 default:
95 break;
96 }
97 }).build();
98 fwdObjQueueHead = CacheBuilder.newBuilder()
99 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
Charles Chan33f4a912018-04-19 23:35:30 -0700100 .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
Charles Chan45c19d72018-04-19 21:38:40 -0700101 Objective obj = notification.getValue();
102 switch (notification.getCause()) {
103 case EXPIRED:
104 case COLLECTED:
105 case SIZE:
106 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
107 break;
108 case EXPLICIT: // No action when the objective completes correctly
109 case REPLACED: // No action when a pending forward or next objective gets executed
110 default:
111 break;
112 }
113 }).build();
114 nextObjQueueHead = CacheBuilder.newBuilder()
115 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
116 .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
117 Objective obj = notification.getValue();
118 switch (notification.getCause()) {
119 case EXPIRED:
120 case COLLECTED:
121 case SIZE:
122 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
123 break;
124 case EXPLICIT: // No action when the objective completes correctly
125 case REPLACED: // No action when a pending forward or next objective gets executed
126 default:
127 break;
128 }
129 }).build();
130
131 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
132 cacheCleaner.scheduleAtFixedRate(() -> {
133 filtObjQueueHead.cleanUp();
134 fwdObjQueueHead.cleanUp();
135 nextObjQueueHead.cleanUp();
136 }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
137
Charles Chana7903c82018-03-15 20:14:16 -0700138 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700139 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700140 flowObjectiveStore.unsetDelegate(super.delegate);
141 flowObjectiveStore.setDelegate(delegate);
142 }
143
144 @Deactivate
145 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700146 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700147 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700148
Charles Chana7903c82018-03-15 20:14:16 -0700149 super.deactivate();
150 }
151
152 /**
153 * Processes given objective on given device.
154 * Objectives submitted through this method are guaranteed to be executed in order.
155 *
156 * @param deviceId Device ID
157 * @param originalObjective Flow objective to be executed
158 */
159 private void process(DeviceId deviceId, Objective originalObjective) {
160 // Inject ObjectiveContext such that we can get notified when it is completed
161 Objective.Builder objBuilder = originalObjective.copy();
162 Optional<ObjectiveContext> originalContext = originalObjective.context();
163 ObjectiveContext context = new ObjectiveContext() {
164 @Override
165 public void onSuccess(Objective objective) {
166 log.trace("Flow objective onSuccess {}", objective);
Charles Chan35b589c2018-04-26 16:00:23 -0400167 dequeue(deviceId, objective, null);
Charles Chana7903c82018-03-15 20:14:16 -0700168 originalContext.ifPresent(c -> c.onSuccess(objective));
169 }
170 @Override
171 public void onError(Objective objective, ObjectiveError error) {
Charles Chan35b589c2018-04-26 16:00:23 -0400172 log.warn("Flow objective onError {}. Reason = {}", objective, error);
173 dequeue(deviceId, objective, error);
Charles Chana7903c82018-03-15 20:14:16 -0700174 originalContext.ifPresent(c -> c.onError(objective, error));
175 }
176 };
177
178 // Preserve Objective.Operation
179 Objective objective;
180 switch (originalObjective.op()) {
181 case ADD:
182 objective = objBuilder.add(context);
183 break;
184 case ADD_TO_EXISTING:
185 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
186 break;
187 case REMOVE:
188 objective = objBuilder.remove(context);
189 break;
190 case REMOVE_FROM_EXISTING:
191 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
192 break;
193 case MODIFY:
194 objective = ((NextObjective.Builder) objBuilder).modify(context);
195 break;
196 case VERIFY:
197 objective = ((NextObjective.Builder) objBuilder).verify(context);
198 break;
199 default:
200 log.error("Unknown flow objecitve operation {}", originalObjective.op());
201 return;
202 }
203
204 enqueue(deviceId, objective);
205 }
206
207 @Override
208 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
209 process(deviceId, filteringObjective);
210 }
211
212 @Override
213 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
214 process(deviceId, forwardingObjective);
215 }
216
217 @Override
218 public void next(DeviceId deviceId, NextObjective nextObjective) {
219 process(deviceId, nextObjective);
220 }
221
Charles Chan33f4a912018-04-19 23:35:30 -0700222 @Override
223 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
224 return filtObjQueue;
225 }
226
227 @Override
228 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
229 return fwdObjQueue;
230 }
231
232 @Override
233 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
234 return nextObjQueue;
235 }
236
237 @Override
238 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
239 return filtObjQueueHead.asMap();
240 }
241
242 @Override
243 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
244 return fwdObjQueueHead.asMap();
245 }
246
247 @Override
248 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
249 return nextObjQueueHead.asMap();
250 }
251
252 @Override
253 public void clearQueue() {
254 filtObjQueueHead.invalidateAll();
255 fwdObjQueueHead.invalidateAll();
256 nextObjQueueHead.invalidateAll();
257
258 filtObjQueueHead.cleanUp();
259 fwdObjQueueHead.cleanUp();
260 nextObjQueueHead.cleanUp();
261
262 filtObjQueue.clear();
263 fwdObjQueue.clear();
264 nextObjQueue.clear();
265 }
266
Charles Chana7903c82018-03-15 20:14:16 -0700267 /**
268 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
269 *
270 * @param deviceId Device ID
271 * @param obj Flow objective
272 */
273 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
274 int queueSize;
275 int priority = obj.priority();
276
Charles Chanc09ad6d2018-04-04 16:31:23 -0700277 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
278 Tools.log(log, logLevel, "Enqueue {}", obj);
279
Charles Chana7903c82018-03-15 20:14:16 -0700280 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700281 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700282 filtObjQueue.put(k, obj);
283 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700284 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700285 ForwardingObjQueueKey k =
286 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700287 fwdObjQueue.put(k, obj);
288 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700289 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700290 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
291 nextObjQueue.put(k, obj);
292 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700293 } else {
294 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
295 return;
296 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700297 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700298
299 // Execute immediately if there is no pending obj ahead
300 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700301 execute(deviceId, obj);
302 }
303 }
304
305 /**
306 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
307 *
308 * @param deviceId Device ID
309 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400310 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700311 */
Charles Chan35b589c2018-04-26 16:00:23 -0400312 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700313 List<Objective> remaining;
314 int priority = obj.priority();
315
Charles Chanc09ad6d2018-04-04 16:31:23 -0700316 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
317 Tools.log(log, logLevel, "Dequeue {}", obj);
318
Charles Chana7903c82018-03-15 20:14:16 -0700319 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700320 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700321 filtObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700322 filtObjQueue.remove(k, obj);
323 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700324 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700325 ForwardingObjQueueKey k =
326 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700327 fwdObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700328 fwdObjQueue.remove(k, obj);
329 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700330 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400331 if (error != null) {
332 // Remove pendingForwards and pendingNexts if next objective failed
333 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
334 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
335
336 if (removedForwards != null) {
337 removedForwards.stream().map(PendingFlowObjective::flowObjective)
338 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
339 c.onError(pendingObj, error)));
340 }
341 if (removedNexts != null) {
342 removedNexts.stream().map(PendingFlowObjective::flowObjective)
343 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
344 c.onError(pendingObj, error)));
345 }
346 }
Charles Chana7903c82018-03-15 20:14:16 -0700347 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chan45c19d72018-04-19 21:38:40 -0700348 nextObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700349 nextObjQueue.remove(k, obj);
350 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700351 } else {
352 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
353 return;
354 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700355 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700356
357 // Submit the next one in the queue, if any
358 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700359 execute(deviceId, remaining.get(0));
360 }
361 }
362
363 /**
364 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
365 * Therefore we must be certain that this method is called in-order.
366 *
367 * @param deviceId Device ID
368 * @param obj Flow objective
369 */
370 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700371 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
372 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
373
Charles Chan45c19d72018-04-19 21:38:40 -0700374 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700375 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700376 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700377 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700378 super.filter(deviceId, (FilteringObjective) obj);
379 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700380 ForwardingObjQueueKey k =
381 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700382 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700383 super.forward(deviceId, (ForwardingObjective) obj);
384 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700385 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
386 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700387 super.next(deviceId, (NextObjective) obj);
388 } else {
389 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
390 }
391 }
392
393 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
394 @Override
395 public void notify(ObjectiveEvent event) {
396 if (event.type() == ObjectiveEvent.Type.ADD) {
397 log.debug("Received notification of obj event {}", event);
398 Set<PendingFlowObjective> pending;
399
400 // first send all pending flows
401 synchronized (pendingForwards) {
402 // needs to be synchronized for queueObjective lookup
403 pending = pendingForwards.remove(event.subject());
404 }
405 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700406 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700407 } else {
408 log.debug("Processing {} pending forwarding objectives for nextId {}",
409 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700410 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700411 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
412 }
413
414 // now check for pending next-objectives
415 // Note: This is still necessary despite the existence of in-order execution.
416 // Since the in-order execution does not handle the case of
417 // ADD_TO_EXISTING coming before ADD
418 List<PendingFlowObjective> pendNexts;
419 synchronized (pendingNexts) {
420 // needs to be synchronized for queueObjective lookup
421 pendNexts = pendingNexts.remove(event.subject());
422 }
423 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700424 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700425 } else {
426 log.debug("Processing {} pending next objectives for nextId {}",
427 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700428 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700429 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
430 }
431 }
432 }
433 }
Charles Chana7903c82018-03-15 20:14:16 -0700434}