blob: 457c6e6e8b1002ca96ef460f7863d3e695aba30d [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Charles Chana724c5132018-11-27 21:33:33 +080021import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalListeners;
Charles Chana7903c82018-03-15 20:14:16 -070023import com.google.common.collect.ArrayListMultimap;
24import com.google.common.collect.ListMultimap;
25import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070029import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070030import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070032import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070033import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070034import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.NextObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.flowobjective.ObjectiveEvent;
Charles Chana724c5132018-11-27 21:33:33 +080041import org.onosproject.net.flowobjective.ObjectiveQueueKey;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
Charles Chana7903c82018-03-15 20:14:16 -070045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070049import java.util.Map;
Charles Chana724c5132018-11-27 21:33:33 +080050import java.util.Objects;
Charles Chana7903c82018-03-15 20:14:16 -070051import java.util.Optional;
52import java.util.Set;
Charles Chana724c5132018-11-27 21:33:33 +080053import java.util.concurrent.ExecutorService;
Charles Chan45c19d72018-04-19 21:38:40 -070054import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
56
Charles Chana724c5132018-11-27 21:33:33 +080057import static java.util.concurrent.Executors.newSingleThreadExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070058import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
59import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070060
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061@Component(immediate = true, service = FlowObjectiveService.class)
Charles Chana7903c82018-03-15 20:14:16 -070062public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
63 private final Logger log = LoggerFactory.getLogger(getClass());
64
Charles Chan45c19d72018-04-19 21:38:40 -070065 // TODO Make queue timeout configurable
Charles Chan1491b9b2018-11-27 21:33:33 +080066 static int objTimeoutMs = 15000;
Charles Chan45c19d72018-04-19 21:38:40 -070067
Charles Chan33f4a912018-04-19 23:35:30 -070068 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
69 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070070 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
71 private ScheduledExecutorService cacheCleaner;
Charles Chana724c5132018-11-27 21:33:33 +080072 private ExecutorService filtCacheEventExecutor;
73 private ExecutorService fwdCacheEventExecutor;
74 private ExecutorService nextCacheEventExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070075
Charles Chan33f4a912018-04-19 23:35:30 -070076 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070077 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070078 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070079 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
80 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
81 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
82
83 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
84
85 @Activate
86 protected void activate() {
87 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070088
Charles Chana724c5132018-11-27 21:33:33 +080089 filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
90 fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
91 nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
92
93 RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
94 Objective obj = notification.getValue();
95 switch (notification.getCause()) {
96 case EXPIRED:
97 case COLLECTED:
98 case SIZE:
99 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
100 break;
101 case EXPLICIT: // No action when the objective completes correctly
102 case REPLACED: // No action when a pending forward or next objective gets executed
103 default:
104 break;
105 }
106 };
Charles Chan45c19d72018-04-19 21:38:40 -0700107 filtObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800108 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800109 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
110 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700111 fwdObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800112 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800113 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
114 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700115 nextObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800116 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800117 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
118 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700119
120 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
121 cacheCleaner.scheduleAtFixedRate(() -> {
122 filtObjQueueHead.cleanUp();
123 fwdObjQueueHead.cleanUp();
124 nextObjQueueHead.cleanUp();
Charles Chan1491b9b2018-11-27 21:33:33 +0800125 }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
Charles Chan45c19d72018-04-19 21:38:40 -0700126
Charles Chana7903c82018-03-15 20:14:16 -0700127 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700128 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700129 flowObjectiveStore.unsetDelegate(super.delegate);
130 flowObjectiveStore.setDelegate(delegate);
131 }
132
133 @Deactivate
134 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700135 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700136 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700137
Charles Chana724c5132018-11-27 21:33:33 +0800138 filtCacheEventExecutor.shutdown();
139 fwdCacheEventExecutor.shutdown();
140 nextCacheEventExecutor.shutdown();
141
Charles Chana7903c82018-03-15 20:14:16 -0700142 super.deactivate();
143 }
144
145 /**
146 * Processes given objective on given device.
147 * Objectives submitted through this method are guaranteed to be executed in order.
148 *
149 * @param deviceId Device ID
150 * @param originalObjective Flow objective to be executed
151 */
152 private void process(DeviceId deviceId, Objective originalObjective) {
153 // Inject ObjectiveContext such that we can get notified when it is completed
154 Objective.Builder objBuilder = originalObjective.copy();
155 Optional<ObjectiveContext> originalContext = originalObjective.context();
156 ObjectiveContext context = new ObjectiveContext() {
157 @Override
158 public void onSuccess(Objective objective) {
159 log.trace("Flow objective onSuccess {}", objective);
Charles Chan35b589c2018-04-26 16:00:23 -0400160 dequeue(deviceId, objective, null);
Charles Chana7903c82018-03-15 20:14:16 -0700161 originalContext.ifPresent(c -> c.onSuccess(objective));
162 }
163 @Override
164 public void onError(Objective objective, ObjectiveError error) {
Charles Chan35b589c2018-04-26 16:00:23 -0400165 log.warn("Flow objective onError {}. Reason = {}", objective, error);
166 dequeue(deviceId, objective, error);
Charles Chana7903c82018-03-15 20:14:16 -0700167 originalContext.ifPresent(c -> c.onError(objective, error));
168 }
169 };
170
171 // Preserve Objective.Operation
172 Objective objective;
173 switch (originalObjective.op()) {
174 case ADD:
175 objective = objBuilder.add(context);
176 break;
177 case ADD_TO_EXISTING:
178 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
179 break;
180 case REMOVE:
181 objective = objBuilder.remove(context);
182 break;
183 case REMOVE_FROM_EXISTING:
184 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
185 break;
186 case MODIFY:
187 objective = ((NextObjective.Builder) objBuilder).modify(context);
188 break;
189 case VERIFY:
190 objective = ((NextObjective.Builder) objBuilder).verify(context);
191 break;
192 default:
193 log.error("Unknown flow objecitve operation {}", originalObjective.op());
194 return;
195 }
196
197 enqueue(deviceId, objective);
198 }
199
200 @Override
201 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
202 process(deviceId, filteringObjective);
203 }
204
205 @Override
206 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
207 process(deviceId, forwardingObjective);
208 }
209
210 @Override
211 public void next(DeviceId deviceId, NextObjective nextObjective) {
212 process(deviceId, nextObjective);
213 }
214
Charles Chan33f4a912018-04-19 23:35:30 -0700215 @Override
216 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
217 return filtObjQueue;
218 }
219
220 @Override
221 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
222 return fwdObjQueue;
223 }
224
225 @Override
226 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
227 return nextObjQueue;
228 }
229
230 @Override
231 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
232 return filtObjQueueHead.asMap();
233 }
234
235 @Override
236 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
237 return fwdObjQueueHead.asMap();
238 }
239
240 @Override
241 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
242 return nextObjQueueHead.asMap();
243 }
244
245 @Override
246 public void clearQueue() {
247 filtObjQueueHead.invalidateAll();
248 fwdObjQueueHead.invalidateAll();
249 nextObjQueueHead.invalidateAll();
250
251 filtObjQueueHead.cleanUp();
252 fwdObjQueueHead.cleanUp();
253 nextObjQueueHead.cleanUp();
254
255 filtObjQueue.clear();
256 fwdObjQueue.clear();
257 nextObjQueue.clear();
258 }
259
Charles Chana7903c82018-03-15 20:14:16 -0700260 /**
261 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
262 *
263 * @param deviceId Device ID
264 * @param obj Flow objective
265 */
266 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
267 int queueSize;
268 int priority = obj.priority();
269
Charles Chanc09ad6d2018-04-04 16:31:23 -0700270 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
271 Tools.log(log, logLevel, "Enqueue {}", obj);
272
Charles Chana7903c82018-03-15 20:14:16 -0700273 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700274 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700275 filtObjQueue.put(k, obj);
276 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700277 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700278 ForwardingObjQueueKey k =
279 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700280 fwdObjQueue.put(k, obj);
281 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700282 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700283 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
284 nextObjQueue.put(k, obj);
285 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700286 } else {
287 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
288 return;
289 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700290 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700291
292 // Execute immediately if there is no pending obj ahead
293 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700294 execute(deviceId, obj);
295 }
296 }
297
298 /**
299 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
300 *
301 * @param deviceId Device ID
302 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400303 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700304 */
Charles Chan35b589c2018-04-26 16:00:23 -0400305 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700306 List<Objective> remaining;
307 int priority = obj.priority();
308
Charles Chanc09ad6d2018-04-04 16:31:23 -0700309 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
310 Tools.log(log, logLevel, "Dequeue {}", obj);
311
Charles Chana7903c82018-03-15 20:14:16 -0700312 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700313 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana724c5132018-11-27 21:33:33 +0800314 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800315 filtObjQueueHead.invalidate(k);
316 }
Charles Chana7903c82018-03-15 20:14:16 -0700317 filtObjQueue.remove(k, obj);
318 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700319 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700320 ForwardingObjQueueKey k =
321 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana724c5132018-11-27 21:33:33 +0800322 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800323 fwdObjQueueHead.invalidate(k);
324 }
Charles Chana7903c82018-03-15 20:14:16 -0700325 fwdObjQueue.remove(k, obj);
326 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700327 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400328 if (error != null) {
329 // Remove pendingForwards and pendingNexts if next objective failed
330 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
331 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
332
333 if (removedForwards != null) {
334 removedForwards.stream().map(PendingFlowObjective::flowObjective)
335 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
336 c.onError(pendingObj, error)));
337 }
338 if (removedNexts != null) {
339 removedNexts.stream().map(PendingFlowObjective::flowObjective)
340 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
341 c.onError(pendingObj, error)));
342 }
343 }
Charles Chana7903c82018-03-15 20:14:16 -0700344 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chana724c5132018-11-27 21:33:33 +0800345 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800346 nextObjQueueHead.invalidate(k);
347 }
Charles Chana7903c82018-03-15 20:14:16 -0700348 nextObjQueue.remove(k, obj);
349 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700350 } else {
351 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
352 return;
353 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700354 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700355
356 // Submit the next one in the queue, if any
357 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700358 execute(deviceId, remaining.get(0));
359 }
360 }
361
362 /**
363 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
364 * Therefore we must be certain that this method is called in-order.
365 *
366 * @param deviceId Device ID
367 * @param obj Flow objective
368 */
369 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700370 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
371 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
372
Charles Chan45c19d72018-04-19 21:38:40 -0700373 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700374 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700375 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700376 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700377 super.filter(deviceId, (FilteringObjective) obj);
378 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700379 ForwardingObjQueueKey k =
380 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700381 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700382 super.forward(deviceId, (ForwardingObjective) obj);
383 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700384 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
385 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700386 super.next(deviceId, (NextObjective) obj);
387 } else {
388 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
389 }
390 }
391
392 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
393 @Override
394 public void notify(ObjectiveEvent event) {
395 if (event.type() == ObjectiveEvent.Type.ADD) {
396 log.debug("Received notification of obj event {}", event);
397 Set<PendingFlowObjective> pending;
398
399 // first send all pending flows
400 synchronized (pendingForwards) {
401 // needs to be synchronized for queueObjective lookup
402 pending = pendingForwards.remove(event.subject());
403 }
404 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700405 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700406 } else {
407 log.debug("Processing {} pending forwarding objectives for nextId {}",
408 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700409 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700410 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
411 }
412
413 // now check for pending next-objectives
414 // Note: This is still necessary despite the existence of in-order execution.
415 // Since the in-order execution does not handle the case of
416 // ADD_TO_EXISTING coming before ADD
417 List<PendingFlowObjective> pendNexts;
418 synchronized (pendingNexts) {
419 // needs to be synchronized for queueObjective lookup
420 pendNexts = pendingNexts.remove(event.subject());
421 }
422 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700423 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700424 } else {
425 log.debug("Processing {} pending next objectives for nextId {}",
426 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700427 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700428 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
429 }
430 }
431 }
432 }
Charles Chana7903c82018-03-15 20:14:16 -0700433}