blob: f625e9ca536c1a18a1e43850ab425fe6f80b6cd4 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Charles Chana724c5132018-11-27 21:33:33 +080021import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalListeners;
Charles Chana7903c82018-03-15 20:14:16 -070023import com.google.common.collect.ArrayListMultimap;
24import com.google.common.collect.ListMultimap;
25import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070029import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070030import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070032import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070033import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070034import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.NextObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.flowobjective.ObjectiveEvent;
Charles Chana724c5132018-11-27 21:33:33 +080041import org.onosproject.net.flowobjective.ObjectiveQueueKey;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
Charles Chana7903c82018-03-15 20:14:16 -070045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070049import java.util.Map;
Charles Chana724c5132018-11-27 21:33:33 +080050import java.util.Objects;
Charles Chana7903c82018-03-15 20:14:16 -070051import java.util.Optional;
52import java.util.Set;
Charles Chana724c5132018-11-27 21:33:33 +080053import java.util.concurrent.ExecutorService;
Charles Chan45c19d72018-04-19 21:38:40 -070054import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
56
Charles Chana724c5132018-11-27 21:33:33 +080057import static java.util.concurrent.Executors.newSingleThreadExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070058import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
59import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070060
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061@Component(immediate = true, service = FlowObjectiveService.class)
Charles Chana7903c82018-03-15 20:14:16 -070062public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
63 private final Logger log = LoggerFactory.getLogger(getClass());
64
Charles Chan45c19d72018-04-19 21:38:40 -070065 // TODO Make queue timeout configurable
Charles Chanb4f4fdb2018-12-21 13:55:29 -080066 static final int DEFAULT_OBJ_TIMEOUT = 15000;
67 int objTimeoutMs = DEFAULT_OBJ_TIMEOUT;
Charles Chan45c19d72018-04-19 21:38:40 -070068
Charles Chan33f4a912018-04-19 23:35:30 -070069 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
70 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070071 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
72 private ScheduledExecutorService cacheCleaner;
Charles Chana724c5132018-11-27 21:33:33 +080073 private ExecutorService filtCacheEventExecutor;
74 private ExecutorService fwdCacheEventExecutor;
75 private ExecutorService nextCacheEventExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070076
Charles Chan33f4a912018-04-19 23:35:30 -070077 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070078 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070079 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070080 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
81 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
82 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
83
84 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
85
86 @Activate
87 protected void activate() {
88 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070089
Charles Chana724c5132018-11-27 21:33:33 +080090 filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
91 fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
92 nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
93
94 RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
95 Objective obj = notification.getValue();
96 switch (notification.getCause()) {
97 case EXPIRED:
98 case COLLECTED:
99 case SIZE:
100 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
101 break;
102 case EXPLICIT: // No action when the objective completes correctly
103 case REPLACED: // No action when a pending forward or next objective gets executed
104 default:
105 break;
106 }
107 };
Charles Chan45c19d72018-04-19 21:38:40 -0700108 filtObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800109 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800110 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
111 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700112 fwdObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800113 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800114 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
115 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700116 nextObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800117 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800118 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
119 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700120
121 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
122 cacheCleaner.scheduleAtFixedRate(() -> {
123 filtObjQueueHead.cleanUp();
124 fwdObjQueueHead.cleanUp();
125 nextObjQueueHead.cleanUp();
Charles Chan1491b9b2018-11-27 21:33:33 +0800126 }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
Charles Chan45c19d72018-04-19 21:38:40 -0700127
Charles Chana7903c82018-03-15 20:14:16 -0700128 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700129 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700130 flowObjectiveStore.unsetDelegate(super.delegate);
131 flowObjectiveStore.setDelegate(delegate);
132 }
133
134 @Deactivate
135 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700136 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700137 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700138
Charles Chana724c5132018-11-27 21:33:33 +0800139 filtCacheEventExecutor.shutdown();
140 fwdCacheEventExecutor.shutdown();
141 nextCacheEventExecutor.shutdown();
142
Charles Chana7903c82018-03-15 20:14:16 -0700143 super.deactivate();
144 }
145
146 /**
147 * Processes given objective on given device.
148 * Objectives submitted through this method are guaranteed to be executed in order.
149 *
150 * @param deviceId Device ID
151 * @param originalObjective Flow objective to be executed
152 */
153 private void process(DeviceId deviceId, Objective originalObjective) {
154 // Inject ObjectiveContext such that we can get notified when it is completed
155 Objective.Builder objBuilder = originalObjective.copy();
156 Optional<ObjectiveContext> originalContext = originalObjective.context();
157 ObjectiveContext context = new ObjectiveContext() {
158 @Override
159 public void onSuccess(Objective objective) {
160 log.trace("Flow objective onSuccess {}", objective);
Charles Chan35b589c2018-04-26 16:00:23 -0400161 dequeue(deviceId, objective, null);
Charles Chana7903c82018-03-15 20:14:16 -0700162 originalContext.ifPresent(c -> c.onSuccess(objective));
163 }
164 @Override
165 public void onError(Objective objective, ObjectiveError error) {
Charles Chan35b589c2018-04-26 16:00:23 -0400166 log.warn("Flow objective onError {}. Reason = {}", objective, error);
167 dequeue(deviceId, objective, error);
Charles Chana7903c82018-03-15 20:14:16 -0700168 originalContext.ifPresent(c -> c.onError(objective, error));
169 }
170 };
171
172 // Preserve Objective.Operation
173 Objective objective;
174 switch (originalObjective.op()) {
175 case ADD:
176 objective = objBuilder.add(context);
177 break;
178 case ADD_TO_EXISTING:
179 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
180 break;
181 case REMOVE:
182 objective = objBuilder.remove(context);
183 break;
184 case REMOVE_FROM_EXISTING:
185 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
186 break;
187 case MODIFY:
188 objective = ((NextObjective.Builder) objBuilder).modify(context);
189 break;
190 case VERIFY:
191 objective = ((NextObjective.Builder) objBuilder).verify(context);
192 break;
193 default:
194 log.error("Unknown flow objecitve operation {}", originalObjective.op());
195 return;
196 }
197
198 enqueue(deviceId, objective);
199 }
200
201 @Override
202 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
203 process(deviceId, filteringObjective);
204 }
205
206 @Override
207 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
208 process(deviceId, forwardingObjective);
209 }
210
211 @Override
212 public void next(DeviceId deviceId, NextObjective nextObjective) {
213 process(deviceId, nextObjective);
214 }
215
Charles Chan33f4a912018-04-19 23:35:30 -0700216 @Override
217 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
218 return filtObjQueue;
219 }
220
221 @Override
222 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
223 return fwdObjQueue;
224 }
225
226 @Override
227 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
228 return nextObjQueue;
229 }
230
231 @Override
232 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
233 return filtObjQueueHead.asMap();
234 }
235
236 @Override
237 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
238 return fwdObjQueueHead.asMap();
239 }
240
241 @Override
242 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
243 return nextObjQueueHead.asMap();
244 }
245
246 @Override
247 public void clearQueue() {
248 filtObjQueueHead.invalidateAll();
249 fwdObjQueueHead.invalidateAll();
250 nextObjQueueHead.invalidateAll();
251
252 filtObjQueueHead.cleanUp();
253 fwdObjQueueHead.cleanUp();
254 nextObjQueueHead.cleanUp();
255
256 filtObjQueue.clear();
257 fwdObjQueue.clear();
258 nextObjQueue.clear();
259 }
260
Charles Chana7903c82018-03-15 20:14:16 -0700261 /**
262 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
263 *
264 * @param deviceId Device ID
265 * @param obj Flow objective
266 */
267 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
268 int queueSize;
269 int priority = obj.priority();
270
Charles Chanc09ad6d2018-04-04 16:31:23 -0700271 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
272 Tools.log(log, logLevel, "Enqueue {}", obj);
273
Charles Chana7903c82018-03-15 20:14:16 -0700274 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700275 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700276 filtObjQueue.put(k, obj);
277 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700278 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700279 ForwardingObjQueueKey k =
280 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700281 fwdObjQueue.put(k, obj);
282 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700283 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700284 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
285 nextObjQueue.put(k, obj);
286 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700287 } else {
288 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
289 return;
290 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700291 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700292
293 // Execute immediately if there is no pending obj ahead
294 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700295 execute(deviceId, obj);
296 }
297 }
298
299 /**
300 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
301 *
302 * @param deviceId Device ID
303 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400304 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700305 */
Charles Chan35b589c2018-04-26 16:00:23 -0400306 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700307 List<Objective> remaining;
308 int priority = obj.priority();
309
Charles Chanc09ad6d2018-04-04 16:31:23 -0700310 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
311 Tools.log(log, logLevel, "Dequeue {}", obj);
312
Charles Chana7903c82018-03-15 20:14:16 -0700313 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700314 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana724c5132018-11-27 21:33:33 +0800315 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800316 filtObjQueueHead.invalidate(k);
317 }
Charles Chana7903c82018-03-15 20:14:16 -0700318 filtObjQueue.remove(k, obj);
319 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700320 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700321 ForwardingObjQueueKey k =
322 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana724c5132018-11-27 21:33:33 +0800323 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800324 fwdObjQueueHead.invalidate(k);
325 }
Charles Chana7903c82018-03-15 20:14:16 -0700326 fwdObjQueue.remove(k, obj);
327 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700328 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400329 if (error != null) {
330 // Remove pendingForwards and pendingNexts if next objective failed
331 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
332 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
333
334 if (removedForwards != null) {
335 removedForwards.stream().map(PendingFlowObjective::flowObjective)
336 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
337 c.onError(pendingObj, error)));
338 }
339 if (removedNexts != null) {
340 removedNexts.stream().map(PendingFlowObjective::flowObjective)
341 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
342 c.onError(pendingObj, error)));
343 }
344 }
Charles Chana7903c82018-03-15 20:14:16 -0700345 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chana724c5132018-11-27 21:33:33 +0800346 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800347 nextObjQueueHead.invalidate(k);
348 }
Charles Chana7903c82018-03-15 20:14:16 -0700349 nextObjQueue.remove(k, obj);
350 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700351 } else {
352 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
353 return;
354 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700355 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700356
357 // Submit the next one in the queue, if any
358 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700359 execute(deviceId, remaining.get(0));
360 }
361 }
362
363 /**
364 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
365 * Therefore we must be certain that this method is called in-order.
366 *
367 * @param deviceId Device ID
368 * @param obj Flow objective
369 */
370 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700371 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
372 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
373
Charles Chan45c19d72018-04-19 21:38:40 -0700374 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700375 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700376 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700377 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700378 super.filter(deviceId, (FilteringObjective) obj);
379 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700380 ForwardingObjQueueKey k =
381 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700382 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700383 super.forward(deviceId, (ForwardingObjective) obj);
384 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700385 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
386 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700387 super.next(deviceId, (NextObjective) obj);
388 } else {
389 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
390 }
391 }
392
393 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
394 @Override
395 public void notify(ObjectiveEvent event) {
396 if (event.type() == ObjectiveEvent.Type.ADD) {
397 log.debug("Received notification of obj event {}", event);
398 Set<PendingFlowObjective> pending;
399
400 // first send all pending flows
401 synchronized (pendingForwards) {
402 // needs to be synchronized for queueObjective lookup
403 pending = pendingForwards.remove(event.subject());
404 }
405 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700406 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700407 } else {
408 log.debug("Processing {} pending forwarding objectives for nextId {}",
409 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700410 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700411 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
412 }
413
414 // now check for pending next-objectives
415 // Note: This is still necessary despite the existence of in-order execution.
416 // Since the in-order execution does not handle the case of
417 // ADD_TO_EXISTING coming before ADD
418 List<PendingFlowObjective> pendNexts;
419 synchronized (pendingNexts) {
420 // needs to be synchronized for queueObjective lookup
421 pendNexts = pendingNexts.remove(event.subject());
422 }
423 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700424 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700425 } else {
426 log.debug("Processing {} pending next objectives for nextId {}",
427 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700428 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700429 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
430 }
431 }
432 }
433 }
Charles Chana7903c82018-03-15 20:14:16 -0700434}