blob: 4f8404ba26a8189c1d772173167df38bd427bfe7 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Charles Chana724c5132018-11-27 21:33:33 +080021import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalListeners;
Charles Chana7903c82018-03-15 20:14:16 -070023import com.google.common.collect.ArrayListMultimap;
24import com.google.common.collect.ListMultimap;
25import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070029import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070030import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070032import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070033import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070034import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.NextObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.flowobjective.ObjectiveEvent;
Charles Chana724c5132018-11-27 21:33:33 +080041import org.onosproject.net.flowobjective.ObjectiveQueueKey;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
Charles Chana7903c82018-03-15 20:14:16 -070045import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070049import java.util.Map;
Charles Chana724c5132018-11-27 21:33:33 +080050import java.util.Objects;
Charles Chana7903c82018-03-15 20:14:16 -070051import java.util.Optional;
52import java.util.Set;
Charles Chana724c5132018-11-27 21:33:33 +080053import java.util.concurrent.ExecutorService;
Charles Chan45c19d72018-04-19 21:38:40 -070054import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
Charles Chanc51852e2019-01-07 19:59:10 -080056import java.util.concurrent.atomic.AtomicBoolean;
Charles Chan45c19d72018-04-19 21:38:40 -070057
Charles Chana724c5132018-11-27 21:33:33 +080058import static java.util.concurrent.Executors.newSingleThreadExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070059import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
60import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070061
Ray Milkeyd84f89b2018-08-17 14:54:17 -070062@Component(immediate = true, service = FlowObjectiveService.class)
Charles Chana7903c82018-03-15 20:14:16 -070063public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
64 private final Logger log = LoggerFactory.getLogger(getClass());
65
Charles Chan45c19d72018-04-19 21:38:40 -070066 // TODO Make queue timeout configurable
Charles Chanb4f4fdb2018-12-21 13:55:29 -080067 static final int DEFAULT_OBJ_TIMEOUT = 15000;
68 int objTimeoutMs = DEFAULT_OBJ_TIMEOUT;
Charles Chan45c19d72018-04-19 21:38:40 -070069
Charles Chan33f4a912018-04-19 23:35:30 -070070 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
71 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070072 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
73 private ScheduledExecutorService cacheCleaner;
Charles Chana724c5132018-11-27 21:33:33 +080074 private ExecutorService filtCacheEventExecutor;
75 private ExecutorService fwdCacheEventExecutor;
76 private ExecutorService nextCacheEventExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070077
Charles Chan33f4a912018-04-19 23:35:30 -070078 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070079 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070080 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070081 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
82 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
83 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
84
85 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
86
87 @Activate
88 protected void activate() {
89 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070090
Charles Chana724c5132018-11-27 21:33:33 +080091 filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
92 fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
93 nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
94
95 RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
96 Objective obj = notification.getValue();
97 switch (notification.getCause()) {
98 case EXPIRED:
99 case COLLECTED:
100 case SIZE:
101 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
102 break;
103 case EXPLICIT: // No action when the objective completes correctly
104 case REPLACED: // No action when a pending forward or next objective gets executed
105 default:
106 break;
107 }
108 };
Charles Chan45c19d72018-04-19 21:38:40 -0700109 filtObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800110 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800111 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
112 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700113 fwdObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800114 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800115 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
116 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700117 nextObjQueueHead = CacheBuilder.newBuilder()
Charles Chan1491b9b2018-11-27 21:33:33 +0800118 .expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800119 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
120 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700121
122 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
123 cacheCleaner.scheduleAtFixedRate(() -> {
124 filtObjQueueHead.cleanUp();
125 fwdObjQueueHead.cleanUp();
126 nextObjQueueHead.cleanUp();
Charles Chan1491b9b2018-11-27 21:33:33 +0800127 }, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
Charles Chan45c19d72018-04-19 21:38:40 -0700128
Charles Chana7903c82018-03-15 20:14:16 -0700129 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700130 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700131 flowObjectiveStore.unsetDelegate(super.delegate);
132 flowObjectiveStore.setDelegate(delegate);
133 }
134
135 @Deactivate
136 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700137 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700138 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700139
Charles Chana724c5132018-11-27 21:33:33 +0800140 filtCacheEventExecutor.shutdown();
141 fwdCacheEventExecutor.shutdown();
142 nextCacheEventExecutor.shutdown();
143
Charles Chana7903c82018-03-15 20:14:16 -0700144 super.deactivate();
145 }
146
147 /**
148 * Processes given objective on given device.
149 * Objectives submitted through this method are guaranteed to be executed in order.
150 *
151 * @param deviceId Device ID
152 * @param originalObjective Flow objective to be executed
153 */
154 private void process(DeviceId deviceId, Objective originalObjective) {
155 // Inject ObjectiveContext such that we can get notified when it is completed
156 Objective.Builder objBuilder = originalObjective.copy();
157 Optional<ObjectiveContext> originalContext = originalObjective.context();
Charles Chanc51852e2019-01-07 19:59:10 -0800158 ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
Charles Chana7903c82018-03-15 20:14:16 -0700159
160 // Preserve Objective.Operation
161 Objective objective;
162 switch (originalObjective.op()) {
163 case ADD:
164 objective = objBuilder.add(context);
165 break;
166 case ADD_TO_EXISTING:
167 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
168 break;
169 case REMOVE:
170 objective = objBuilder.remove(context);
171 break;
172 case REMOVE_FROM_EXISTING:
173 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
174 break;
175 case MODIFY:
176 objective = ((NextObjective.Builder) objBuilder).modify(context);
177 break;
178 case VERIFY:
179 objective = ((NextObjective.Builder) objBuilder).verify(context);
180 break;
181 default:
182 log.error("Unknown flow objecitve operation {}", originalObjective.op());
183 return;
184 }
185
186 enqueue(deviceId, objective);
187 }
188
189 @Override
190 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
191 process(deviceId, filteringObjective);
192 }
193
194 @Override
195 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
196 process(deviceId, forwardingObjective);
197 }
198
199 @Override
200 public void next(DeviceId deviceId, NextObjective nextObjective) {
201 process(deviceId, nextObjective);
202 }
203
Charles Chan33f4a912018-04-19 23:35:30 -0700204 @Override
205 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
206 return filtObjQueue;
207 }
208
209 @Override
210 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
211 return fwdObjQueue;
212 }
213
214 @Override
215 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
216 return nextObjQueue;
217 }
218
219 @Override
220 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
221 return filtObjQueueHead.asMap();
222 }
223
224 @Override
225 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
226 return fwdObjQueueHead.asMap();
227 }
228
229 @Override
230 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
231 return nextObjQueueHead.asMap();
232 }
233
234 @Override
235 public void clearQueue() {
236 filtObjQueueHead.invalidateAll();
237 fwdObjQueueHead.invalidateAll();
238 nextObjQueueHead.invalidateAll();
239
240 filtObjQueueHead.cleanUp();
241 fwdObjQueueHead.cleanUp();
242 nextObjQueueHead.cleanUp();
243
244 filtObjQueue.clear();
245 fwdObjQueue.clear();
246 nextObjQueue.clear();
247 }
248
Charles Chana7903c82018-03-15 20:14:16 -0700249 /**
250 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
251 *
252 * @param deviceId Device ID
253 * @param obj Flow objective
254 */
255 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
256 int queueSize;
257 int priority = obj.priority();
258
Charles Chanc09ad6d2018-04-04 16:31:23 -0700259 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
260 Tools.log(log, logLevel, "Enqueue {}", obj);
261
Charles Chana7903c82018-03-15 20:14:16 -0700262 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700263 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700264 filtObjQueue.put(k, obj);
265 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700266 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700267 ForwardingObjQueueKey k =
268 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700269 fwdObjQueue.put(k, obj);
270 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700271 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700272 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
273 nextObjQueue.put(k, obj);
274 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700275 } else {
276 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
277 return;
278 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700279 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700280
281 // Execute immediately if there is no pending obj ahead
282 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700283 execute(deviceId, obj);
284 }
285 }
286
287 /**
288 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
289 *
290 * @param deviceId Device ID
291 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400292 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700293 */
Charles Chan35b589c2018-04-26 16:00:23 -0400294 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700295 List<Objective> remaining;
296 int priority = obj.priority();
297
Charles Chanc09ad6d2018-04-04 16:31:23 -0700298 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
299 Tools.log(log, logLevel, "Dequeue {}", obj);
300
Charles Chana7903c82018-03-15 20:14:16 -0700301 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700302 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana724c5132018-11-27 21:33:33 +0800303 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800304 filtObjQueueHead.invalidate(k);
305 }
Charles Chana7903c82018-03-15 20:14:16 -0700306 filtObjQueue.remove(k, obj);
307 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700308 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700309 ForwardingObjQueueKey k =
310 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana724c5132018-11-27 21:33:33 +0800311 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800312 fwdObjQueueHead.invalidate(k);
313 }
Charles Chana7903c82018-03-15 20:14:16 -0700314 fwdObjQueue.remove(k, obj);
315 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700316 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400317 if (error != null) {
318 // Remove pendingForwards and pendingNexts if next objective failed
319 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
320 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
321
322 if (removedForwards != null) {
323 removedForwards.stream().map(PendingFlowObjective::flowObjective)
324 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
325 c.onError(pendingObj, error)));
326 }
327 if (removedNexts != null) {
328 removedNexts.stream().map(PendingFlowObjective::flowObjective)
329 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
330 c.onError(pendingObj, error)));
331 }
332 }
Charles Chana7903c82018-03-15 20:14:16 -0700333 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chana724c5132018-11-27 21:33:33 +0800334 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800335 nextObjQueueHead.invalidate(k);
336 }
Charles Chana7903c82018-03-15 20:14:16 -0700337 nextObjQueue.remove(k, obj);
338 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700339 } else {
340 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
341 return;
342 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700343 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700344
345 // Submit the next one in the queue, if any
346 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700347 execute(deviceId, remaining.get(0));
348 }
349 }
350
351 /**
352 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
353 * Therefore we must be certain that this method is called in-order.
354 *
355 * @param deviceId Device ID
356 * @param obj Flow objective
357 */
358 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700359 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
360 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
361
Charles Chan45c19d72018-04-19 21:38:40 -0700362 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700363 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700364 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700365 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700366 super.filter(deviceId, (FilteringObjective) obj);
367 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700368 ForwardingObjQueueKey k =
369 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700370 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700371 super.forward(deviceId, (ForwardingObjective) obj);
372 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700373 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
374 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700375 super.next(deviceId, (NextObjective) obj);
376 } else {
377 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
378 }
379 }
380
381 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
382 @Override
383 public void notify(ObjectiveEvent event) {
384 if (event.type() == ObjectiveEvent.Type.ADD) {
385 log.debug("Received notification of obj event {}", event);
386 Set<PendingFlowObjective> pending;
387
388 // first send all pending flows
389 synchronized (pendingForwards) {
390 // needs to be synchronized for queueObjective lookup
391 pending = pendingForwards.remove(event.subject());
392 }
393 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700394 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700395 } else {
396 log.debug("Processing {} pending forwarding objectives for nextId {}",
397 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700398 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700399 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
400 }
401
402 // now check for pending next-objectives
403 // Note: This is still necessary despite the existence of in-order execution.
404 // Since the in-order execution does not handle the case of
405 // ADD_TO_EXISTING coming before ADD
406 List<PendingFlowObjective> pendNexts;
407 synchronized (pendingNexts) {
408 // needs to be synchronized for queueObjective lookup
409 pendNexts = pendingNexts.remove(event.subject());
410 }
411 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700412 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700413 } else {
414 log.debug("Processing {} pending next objectives for nextId {}",
415 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700416 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700417 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
418 }
419 }
420 }
421 }
Charles Chanc51852e2019-01-07 19:59:10 -0800422
423 final class InOrderObjectiveContext implements ObjectiveContext {
424 private final DeviceId deviceId;
425 private final ObjectiveContext originalContext;
426 // Prevent context from being executed multiple times.
427 // E.g. when the context actually succeed after the cache timeout
428 private final AtomicBoolean done;
429
430 InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
431 this.deviceId = deviceId;
432 this.originalContext = originalContext;
433 this.done = new AtomicBoolean(false);
434 }
435
436 @Override
437 public void onSuccess(Objective objective) {
438 log.trace("Flow objective onSuccess {}", objective);
439
440 if (!done.getAndSet(true)) {
441 dequeue(deviceId, objective, null);
442 if (originalContext != null) {
443 originalContext.onSuccess(objective);
444 }
445 }
446
447 }
448 @Override
449 public void onError(Objective objective, ObjectiveError error) {
450 log.warn("Flow objective onError {}. Reason = {}", objective, error);
451
452 if (!done.getAndSet(true)) {
453 dequeue(deviceId, objective, error);
454 if (originalContext != null) {
455 originalContext.onError(objective, error);
456 }
457 }
458 }
459 }
Charles Chana7903c82018-03-15 20:14:16 -0700460}