blob: 09996980e08f2fe73731cd8d54ca41ea315ab012 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Charles Chana724c5132018-11-27 21:33:33 +080021import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalListeners;
Charles Chana7903c82018-03-15 20:14:16 -070023import com.google.common.collect.ArrayListMultimap;
24import com.google.common.collect.ListMultimap;
25import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070029import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070030import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070032import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070033import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070034import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.NextObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.flowobjective.ObjectiveEvent;
Charles Chana724c5132018-11-27 21:33:33 +080041import org.onosproject.net.flowobjective.ObjectiveQueueKey;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
pier8b3aef42019-03-11 15:14:02 -070045import org.osgi.service.component.ComponentContext;
Charles Chana7903c82018-03-15 20:14:16 -070046import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
49import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070050import java.util.Map;
Charles Chana724c5132018-11-27 21:33:33 +080051import java.util.Objects;
Charles Chana7903c82018-03-15 20:14:16 -070052import java.util.Optional;
53import java.util.Set;
Charles Chana724c5132018-11-27 21:33:33 +080054import java.util.concurrent.ExecutorService;
Charles Chan45c19d72018-04-19 21:38:40 -070055import java.util.concurrent.ScheduledExecutorService;
56import java.util.concurrent.TimeUnit;
Charles Chanc51852e2019-01-07 19:59:10 -080057import java.util.concurrent.atomic.AtomicBoolean;
Charles Chan45c19d72018-04-19 21:38:40 -070058
Charles Chana724c5132018-11-27 21:33:33 +080059import static java.util.concurrent.Executors.newSingleThreadExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070060import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
61import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070062
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063@Component(immediate = true, service = FlowObjectiveService.class)
Charles Chana7903c82018-03-15 20:14:16 -070064public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
65 private final Logger log = LoggerFactory.getLogger(getClass());
66
Charles Chan45c19d72018-04-19 21:38:40 -070067 // TODO Make queue timeout configurable
Charles Chanb4f4fdb2018-12-21 13:55:29 -080068 static final int DEFAULT_OBJ_TIMEOUT = 15000;
69 int objTimeoutMs = DEFAULT_OBJ_TIMEOUT;
Charles Chan45c19d72018-04-19 21:38:40 -070070
Charles Chan33f4a912018-04-19 23:35:30 -070071 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
72 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070073 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
74 private ScheduledExecutorService cacheCleaner;
Charles Chana724c5132018-11-27 21:33:33 +080075 private ExecutorService filtCacheEventExecutor;
76 private ExecutorService fwdCacheEventExecutor;
77 private ExecutorService nextCacheEventExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070078
Charles Chan33f4a912018-04-19 23:35:30 -070079 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070080 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070081 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070082 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
83 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
84 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
85
86 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
87
88 @Activate
pier8b3aef42019-03-11 15:14:02 -070089 protected void activate(ComponentContext context) {
90 super.activate(context);
Charles Chan45c19d72018-04-19 21:38:40 -070091
Charles Chana724c5132018-11-27 21:33:33 +080092 filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
93 fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
94 nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
95
96 RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
97 Objective obj = notification.getValue();
98 switch (notification.getCause()) {
99 case EXPIRED:
100 case COLLECTED:
101 case SIZE:
102 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
103 break;
104 case EXPLICIT: // No action when the objective completes correctly
105 case REPLACED: // No action when a pending forward or next objective gets executed
106 default:
107 break;
108 }
109 };
Charles Chan45c19d72018-04-19 21:38:40 -0700110 filtObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800111 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800112 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
113 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700114 fwdObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800115 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800116 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
117 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700118 nextObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800119 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800120 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
121 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700122
123 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
124 cacheCleaner.scheduleAtFixedRate(() -> {
125 filtObjQueueHead.cleanUp();
126 fwdObjQueueHead.cleanUp();
127 nextObjQueueHead.cleanUp();
Charles Chan1491b9b2018-11-27 21:33:33 +0800128 }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
Charles Chan45c19d72018-04-19 21:38:40 -0700129
Charles Chana7903c82018-03-15 20:14:16 -0700130 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700131 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700132 flowObjectiveStore.unsetDelegate(super.delegate);
133 flowObjectiveStore.setDelegate(delegate);
134 }
135
136 @Deactivate
137 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700138 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700139 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700140
Charles Chana724c5132018-11-27 21:33:33 +0800141 filtCacheEventExecutor.shutdown();
142 fwdCacheEventExecutor.shutdown();
143 nextCacheEventExecutor.shutdown();
144
Charles Chana7903c82018-03-15 20:14:16 -0700145 super.deactivate();
146 }
147
148 /**
149 * Processes given objective on given device.
150 * Objectives submitted through this method are guaranteed to be executed in order.
151 *
152 * @param deviceId Device ID
153 * @param originalObjective Flow objective to be executed
154 */
155 private void process(DeviceId deviceId, Objective originalObjective) {
156 // Inject ObjectiveContext such that we can get notified when it is completed
157 Objective.Builder objBuilder = originalObjective.copy();
158 Optional<ObjectiveContext> originalContext = originalObjective.context();
Charles Chanc51852e2019-01-07 19:59:10 -0800159 ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
Charles Chana7903c82018-03-15 20:14:16 -0700160
161 // Preserve Objective.Operation
162 Objective objective;
163 switch (originalObjective.op()) {
164 case ADD:
165 objective = objBuilder.add(context);
166 break;
167 case ADD_TO_EXISTING:
168 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
169 break;
170 case REMOVE:
171 objective = objBuilder.remove(context);
172 break;
173 case REMOVE_FROM_EXISTING:
174 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
175 break;
176 case MODIFY:
177 objective = ((NextObjective.Builder) objBuilder).modify(context);
178 break;
179 case VERIFY:
180 objective = ((NextObjective.Builder) objBuilder).verify(context);
181 break;
182 default:
183 log.error("Unknown flow objecitve operation {}", originalObjective.op());
184 return;
185 }
186
187 enqueue(deviceId, objective);
188 }
189
190 @Override
191 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
192 process(deviceId, filteringObjective);
193 }
194
195 @Override
196 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
197 process(deviceId, forwardingObjective);
198 }
199
200 @Override
201 public void next(DeviceId deviceId, NextObjective nextObjective) {
202 process(deviceId, nextObjective);
203 }
204
Charles Chan33f4a912018-04-19 23:35:30 -0700205 @Override
206 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
207 return filtObjQueue;
208 }
209
210 @Override
211 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
212 return fwdObjQueue;
213 }
214
215 @Override
216 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
217 return nextObjQueue;
218 }
219
220 @Override
221 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
222 return filtObjQueueHead.asMap();
223 }
224
225 @Override
226 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
227 return fwdObjQueueHead.asMap();
228 }
229
230 @Override
231 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
232 return nextObjQueueHead.asMap();
233 }
234
235 @Override
236 public void clearQueue() {
237 filtObjQueueHead.invalidateAll();
238 fwdObjQueueHead.invalidateAll();
239 nextObjQueueHead.invalidateAll();
240
241 filtObjQueueHead.cleanUp();
242 fwdObjQueueHead.cleanUp();
243 nextObjQueueHead.cleanUp();
244
245 filtObjQueue.clear();
246 fwdObjQueue.clear();
247 nextObjQueue.clear();
248 }
249
Charles Chana7903c82018-03-15 20:14:16 -0700250 /**
251 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
252 *
253 * @param deviceId Device ID
254 * @param obj Flow objective
255 */
256 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
257 int queueSize;
258 int priority = obj.priority();
259
Charles Chanc09ad6d2018-04-04 16:31:23 -0700260 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
261 Tools.log(log, logLevel, "Enqueue {}", obj);
262
Charles Chana7903c82018-03-15 20:14:16 -0700263 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700264 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700265 filtObjQueue.put(k, obj);
266 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700267 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700268 ForwardingObjQueueKey k =
269 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700270 fwdObjQueue.put(k, obj);
271 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700272 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700273 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
274 nextObjQueue.put(k, obj);
275 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700276 } else {
277 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
278 return;
279 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700280 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700281
282 // Execute immediately if there is no pending obj ahead
283 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700284 execute(deviceId, obj);
285 }
286 }
287
288 /**
289 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
290 *
291 * @param deviceId Device ID
292 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400293 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700294 */
Charles Chan35b589c2018-04-26 16:00:23 -0400295 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700296 List<Objective> remaining;
297 int priority = obj.priority();
298
Charles Chanc09ad6d2018-04-04 16:31:23 -0700299 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
300 Tools.log(log, logLevel, "Dequeue {}", obj);
301
Charles Chana7903c82018-03-15 20:14:16 -0700302 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700303 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana724c5132018-11-27 21:33:33 +0800304 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800305 filtObjQueueHead.invalidate(k);
306 }
Charles Chana7903c82018-03-15 20:14:16 -0700307 filtObjQueue.remove(k, obj);
308 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700309 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700310 ForwardingObjQueueKey k =
311 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana724c5132018-11-27 21:33:33 +0800312 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800313 fwdObjQueueHead.invalidate(k);
314 }
Charles Chana7903c82018-03-15 20:14:16 -0700315 fwdObjQueue.remove(k, obj);
316 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700317 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400318 if (error != null) {
319 // Remove pendingForwards and pendingNexts if next objective failed
320 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
321 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
322
323 if (removedForwards != null) {
324 removedForwards.stream().map(PendingFlowObjective::flowObjective)
325 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
326 c.onError(pendingObj, error)));
327 }
328 if (removedNexts != null) {
329 removedNexts.stream().map(PendingFlowObjective::flowObjective)
330 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
331 c.onError(pendingObj, error)));
332 }
333 }
Charles Chana7903c82018-03-15 20:14:16 -0700334 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chana724c5132018-11-27 21:33:33 +0800335 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800336 nextObjQueueHead.invalidate(k);
337 }
Charles Chana7903c82018-03-15 20:14:16 -0700338 nextObjQueue.remove(k, obj);
339 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700340 } else {
341 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
342 return;
343 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700344 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700345
346 // Submit the next one in the queue, if any
347 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700348 execute(deviceId, remaining.get(0));
349 }
350 }
351
352 /**
353 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
354 * Therefore we must be certain that this method is called in-order.
355 *
356 * @param deviceId Device ID
357 * @param obj Flow objective
358 */
359 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700360 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
361 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
362
Charles Chan45c19d72018-04-19 21:38:40 -0700363 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700364 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700365 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700366 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700367 super.filter(deviceId, (FilteringObjective) obj);
368 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700369 ForwardingObjQueueKey k =
370 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700371 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700372 super.forward(deviceId, (ForwardingObjective) obj);
373 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700374 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
375 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700376 super.next(deviceId, (NextObjective) obj);
377 } else {
378 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
379 }
380 }
381
382 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
383 @Override
384 public void notify(ObjectiveEvent event) {
385 if (event.type() == ObjectiveEvent.Type.ADD) {
386 log.debug("Received notification of obj event {}", event);
387 Set<PendingFlowObjective> pending;
388
389 // first send all pending flows
390 synchronized (pendingForwards) {
391 // needs to be synchronized for queueObjective lookup
392 pending = pendingForwards.remove(event.subject());
393 }
394 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700395 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700396 } else {
397 log.debug("Processing {} pending forwarding objectives for nextId {}",
398 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700399 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700400 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
401 }
402
403 // now check for pending next-objectives
404 // Note: This is still necessary despite the existence of in-order execution.
405 // Since the in-order execution does not handle the case of
406 // ADD_TO_EXISTING coming before ADD
407 List<PendingFlowObjective> pendNexts;
408 synchronized (pendingNexts) {
409 // needs to be synchronized for queueObjective lookup
410 pendNexts = pendingNexts.remove(event.subject());
411 }
412 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700413 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700414 } else {
415 log.debug("Processing {} pending next objectives for nextId {}",
416 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700417 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700418 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
419 }
420 }
421 }
422 }
Charles Chanc51852e2019-01-07 19:59:10 -0800423
424 final class InOrderObjectiveContext implements ObjectiveContext {
425 private final DeviceId deviceId;
426 private final ObjectiveContext originalContext;
Charles Chan0a6330f2019-01-16 11:11:02 -0800427 // Prevent onSuccess from being executed after onError is called
428 // i.e. when the context actually succeed after the cache timeout
429 private final AtomicBoolean failed;
Charles Chanc51852e2019-01-07 19:59:10 -0800430
431 InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
432 this.deviceId = deviceId;
433 this.originalContext = originalContext;
Charles Chan0a6330f2019-01-16 11:11:02 -0800434 this.failed = new AtomicBoolean(false);
Charles Chanc51852e2019-01-07 19:59:10 -0800435 }
436
437 @Override
438 public void onSuccess(Objective objective) {
439 log.trace("Flow objective onSuccess {}", objective);
440
Charles Chan0a6330f2019-01-16 11:11:02 -0800441 if (!failed.get()) {
Charles Chanc51852e2019-01-07 19:59:10 -0800442 dequeue(deviceId, objective, null);
443 if (originalContext != null) {
444 originalContext.onSuccess(objective);
445 }
446 }
447
448 }
449 @Override
450 public void onError(Objective objective, ObjectiveError error) {
451 log.warn("Flow objective onError {}. Reason = {}", objective, error);
452
Charles Chan0a6330f2019-01-16 11:11:02 -0800453 if (!failed.getAndSet(true)) {
Charles Chanc51852e2019-01-07 19:59:10 -0800454 dequeue(deviceId, objective, error);
455 if (originalContext != null) {
456 originalContext.onError(objective, error);
457 }
458 }
459 }
460 }
Charles Chana7903c82018-03-15 20:14:16 -0700461}