blob: 50c42dd335fd0dcd70b7fc13efb1c3c919a76a57 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
Charles Chana724c5132018-11-27 21:33:33 +080021import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalListeners;
Charles Chana7903c82018-03-15 20:14:16 -070023import com.google.common.collect.ArrayListMultimap;
24import com.google.common.collect.ListMultimap;
25import com.google.common.collect.Multimaps;
Charles Chanc09ad6d2018-04-04 16:31:23 -070026import org.onlab.util.Tools;
27import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070028import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070029import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070030import org.onosproject.net.flowobjective.FilteringObjective;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.onosproject.net.flowobjective.FlowObjectiveService;
Charles Chana7903c82018-03-15 20:14:16 -070032import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070033import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070034import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.NextObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.flowobjective.ObjectiveError;
40import org.onosproject.net.flowobjective.ObjectiveEvent;
Charles Chana724c5132018-11-27 21:33:33 +080041import org.onosproject.net.flowobjective.ObjectiveQueueKey;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
pier8b3aef42019-03-11 15:14:02 -070045import org.osgi.service.component.ComponentContext;
Charles Chana7903c82018-03-15 20:14:16 -070046import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
49import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070050import java.util.Map;
Charles Chana724c5132018-11-27 21:33:33 +080051import java.util.Objects;
Charles Chana7903c82018-03-15 20:14:16 -070052import java.util.Optional;
53import java.util.Set;
Charles Chana724c5132018-11-27 21:33:33 +080054import java.util.concurrent.ExecutorService;
Charles Chan45c19d72018-04-19 21:38:40 -070055import java.util.concurrent.ScheduledExecutorService;
56import java.util.concurrent.TimeUnit;
Charles Chanc51852e2019-01-07 19:59:10 -080057import java.util.concurrent.atomic.AtomicBoolean;
Charles Chan45c19d72018-04-19 21:38:40 -070058
pierventre13a72352020-10-27 16:08:48 +010059import static com.google.common.base.Strings.isNullOrEmpty;
Charles Chana724c5132018-11-27 21:33:33 +080060import static java.util.concurrent.Executors.newSingleThreadExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070061import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
62import static org.onlab.util.Tools.groupedThreads;
pierventre13a72352020-10-27 16:08:48 +010063import static org.onosproject.net.OsgiPropertyConstants.IFOM_OBJ_TIMEOUT_MS;
64import static org.onosproject.net.OsgiPropertyConstants.IFOM_OBJ_TIMEOUT_MS_DEFAULT;
Charles Chana7903c82018-03-15 20:14:16 -070065
pierventre13a72352020-10-27 16:08:48 +010066/**
67 * Provides implementation of the flow objective programming service.
68 */
69@Component(
70 immediate = true,
71 service = FlowObjectiveService.class,
72 property = {
73 IFOM_OBJ_TIMEOUT_MS + ":Integer=" + IFOM_OBJ_TIMEOUT_MS_DEFAULT
74 }
75)
Charles Chana7903c82018-03-15 20:14:16 -070076public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
77 private final Logger log = LoggerFactory.getLogger(getClass());
78
pierventre13a72352020-10-27 16:08:48 +010079 /** Objective timeout. */
80 int objectiveTimeoutMs = IFOM_OBJ_TIMEOUT_MS_DEFAULT;
Charles Chan45c19d72018-04-19 21:38:40 -070081
Charles Chan33f4a912018-04-19 23:35:30 -070082 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
83 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070084 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
85 private ScheduledExecutorService cacheCleaner;
Charles Chana724c5132018-11-27 21:33:33 +080086 private ExecutorService filtCacheEventExecutor;
87 private ExecutorService fwdCacheEventExecutor;
88 private ExecutorService nextCacheEventExecutor;
Charles Chan45c19d72018-04-19 21:38:40 -070089
Charles Chan33f4a912018-04-19 23:35:30 -070090 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070091 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070092 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070093 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
94 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
95 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
96
97 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
98
pierventre13a72352020-10-27 16:08:48 +010099 final RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
100 Objective obj = notification.getValue();
101 switch (notification.getCause()) {
102 case EXPIRED:
103 case COLLECTED:
104 case SIZE:
105 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
106 break;
107 case EXPLICIT: // No action when the objective completes correctly
108 case REPLACED: // No action when a pending forward or next objective gets executed
109 default:
110 break;
111 }
112 };
113
Charles Chana7903c82018-03-15 20:14:16 -0700114 @Activate
pier8b3aef42019-03-11 15:14:02 -0700115 protected void activate(ComponentContext context) {
116 super.activate(context);
Charles Chan45c19d72018-04-19 21:38:40 -0700117
pierventre13a72352020-10-27 16:08:48 +0100118 cfgService.registerProperties(InOrderFlowObjectiveManager.class);
119
Charles Chana724c5132018-11-27 21:33:33 +0800120 filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
121 fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
122 nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
123
Charles Chan45c19d72018-04-19 21:38:40 -0700124 filtObjQueueHead = CacheBuilder.newBuilder()
pierventre13a72352020-10-27 16:08:48 +0100125 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800126 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
127 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700128 fwdObjQueueHead = CacheBuilder.newBuilder()
pierventre13a72352020-10-27 16:08:48 +0100129 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800130 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
131 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700132 nextObjQueueHead = CacheBuilder.newBuilder()
pierventre13a72352020-10-27 16:08:48 +0100133 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
Charles Chana724c5132018-11-27 21:33:33 +0800134 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
135 .build();
Charles Chan45c19d72018-04-19 21:38:40 -0700136
137 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
138 cacheCleaner.scheduleAtFixedRate(() -> {
139 filtObjQueueHead.cleanUp();
140 fwdObjQueueHead.cleanUp();
141 nextObjQueueHead.cleanUp();
pierventre13a72352020-10-27 16:08:48 +0100142 }, 0, objectiveTimeoutMs, TimeUnit.MILLISECONDS);
Charles Chan45c19d72018-04-19 21:38:40 -0700143
Charles Chana7903c82018-03-15 20:14:16 -0700144 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700145 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700146 flowObjectiveStore.unsetDelegate(super.delegate);
147 flowObjectiveStore.setDelegate(delegate);
148 }
149
150 @Deactivate
151 protected void deactivate() {
pierventre13a72352020-10-27 16:08:48 +0100152 cfgService.unregisterProperties(getClass(), false);
153
Charles Chan45c19d72018-04-19 21:38:40 -0700154 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700155 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700156
Charles Chana724c5132018-11-27 21:33:33 +0800157 filtCacheEventExecutor.shutdown();
158 fwdCacheEventExecutor.shutdown();
159 nextCacheEventExecutor.shutdown();
160
Charles Chana7903c82018-03-15 20:14:16 -0700161 super.deactivate();
pierventre10c9cc42021-07-30 17:31:45 +0200162 // Due to the check in the AbstractStore we have to pass the right instance
163 // to perform a correct clean up. The unset delegate in the super class
164 // will not have any effect
165 flowObjectiveStore.unsetDelegate(delegate);
Charles Chana7903c82018-03-15 20:14:16 -0700166 }
167
168 /**
pierventre13a72352020-10-27 16:08:48 +0100169 * Extracts properties from the component configuration context.
170 *
171 * @param context the component context
172 */
173 @Override
174 protected void readComponentConfiguration(ComponentContext context) {
175 super.readComponentConfiguration(context);
176
177 // objective timeout handling
178 String propertyValue = Tools.get(context.getProperties(), IFOM_OBJ_TIMEOUT_MS);
179 int newObjectiveTimeoutMs = isNullOrEmpty(propertyValue) ?
180 objectiveTimeoutMs : Integer.parseInt(propertyValue);
181 if (newObjectiveTimeoutMs != objectiveTimeoutMs && newObjectiveTimeoutMs > 0) {
182 objectiveTimeoutMs = newObjectiveTimeoutMs;
183 log.info("Reconfigured timeout of the objectives to {}", objectiveTimeoutMs);
184 // Recreates the queues
185 if (filtObjQueueHead != null) {
186 filtObjQueueHead.invalidateAll();
187 filtObjQueueHead = null;
188 }
189 filtObjQueueHead = CacheBuilder.newBuilder()
190 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
191 .removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
192 .build();
193 if (fwdObjQueueHead != null) {
194 fwdObjQueueHead.invalidateAll();
195 fwdObjQueueHead = null;
196 }
197 fwdObjQueueHead = CacheBuilder.newBuilder()
198 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
199 .removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
200 .build();
201 if (nextObjQueueHead != null) {
202 nextObjQueueHead.invalidateAll();
203 nextObjQueueHead = null;
204 }
205 nextObjQueueHead = CacheBuilder.newBuilder()
206 .expireAfterWrite(objectiveTimeoutMs, TimeUnit.MILLISECONDS)
207 .removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
208 .build();
209 // Restart the cleanup thread
210 if (cacheCleaner != null) {
211 cacheCleaner.shutdownNow();
212 cacheCleaner = null;
213 }
214 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
215 cacheCleaner.scheduleAtFixedRate(() -> {
216 filtObjQueueHead.cleanUp();
217 fwdObjQueueHead.cleanUp();
218 nextObjQueueHead.cleanUp();
219 }, 0, objectiveTimeoutMs, TimeUnit.MILLISECONDS);
220 }
221 }
222
223
224 /**
Charles Chana7903c82018-03-15 20:14:16 -0700225 * Processes given objective on given device.
226 * Objectives submitted through this method are guaranteed to be executed in order.
227 *
228 * @param deviceId Device ID
229 * @param originalObjective Flow objective to be executed
230 */
231 private void process(DeviceId deviceId, Objective originalObjective) {
232 // Inject ObjectiveContext such that we can get notified when it is completed
233 Objective.Builder objBuilder = originalObjective.copy();
234 Optional<ObjectiveContext> originalContext = originalObjective.context();
Charles Chanc51852e2019-01-07 19:59:10 -0800235 ObjectiveContext context = new InOrderObjectiveContext(deviceId, originalContext.orElse(null));
Charles Chana7903c82018-03-15 20:14:16 -0700236
237 // Preserve Objective.Operation
238 Objective objective;
239 switch (originalObjective.op()) {
240 case ADD:
241 objective = objBuilder.add(context);
242 break;
243 case ADD_TO_EXISTING:
244 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
245 break;
246 case REMOVE:
247 objective = objBuilder.remove(context);
248 break;
249 case REMOVE_FROM_EXISTING:
250 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
251 break;
252 case MODIFY:
253 objective = ((NextObjective.Builder) objBuilder).modify(context);
254 break;
255 case VERIFY:
256 objective = ((NextObjective.Builder) objBuilder).verify(context);
257 break;
258 default:
259 log.error("Unknown flow objecitve operation {}", originalObjective.op());
260 return;
261 }
262
263 enqueue(deviceId, objective);
264 }
265
266 @Override
267 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
268 process(deviceId, filteringObjective);
269 }
270
271 @Override
272 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
273 process(deviceId, forwardingObjective);
274 }
275
276 @Override
277 public void next(DeviceId deviceId, NextObjective nextObjective) {
278 process(deviceId, nextObjective);
279 }
280
Charles Chan33f4a912018-04-19 23:35:30 -0700281 @Override
282 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
283 return filtObjQueue;
284 }
285
286 @Override
287 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
288 return fwdObjQueue;
289 }
290
291 @Override
292 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
293 return nextObjQueue;
294 }
295
296 @Override
297 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
298 return filtObjQueueHead.asMap();
299 }
300
301 @Override
302 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
303 return fwdObjQueueHead.asMap();
304 }
305
306 @Override
307 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
308 return nextObjQueueHead.asMap();
309 }
310
311 @Override
312 public void clearQueue() {
313 filtObjQueueHead.invalidateAll();
314 fwdObjQueueHead.invalidateAll();
315 nextObjQueueHead.invalidateAll();
316
317 filtObjQueueHead.cleanUp();
318 fwdObjQueueHead.cleanUp();
319 nextObjQueueHead.cleanUp();
320
321 filtObjQueue.clear();
322 fwdObjQueue.clear();
323 nextObjQueue.clear();
324 }
325
Charles Chana7903c82018-03-15 20:14:16 -0700326 /**
327 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
328 *
329 * @param deviceId Device ID
330 * @param obj Flow objective
331 */
332 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
333 int queueSize;
334 int priority = obj.priority();
335
Charles Chanc09ad6d2018-04-04 16:31:23 -0700336 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
337 Tools.log(log, logLevel, "Enqueue {}", obj);
338
Charles Chana7903c82018-03-15 20:14:16 -0700339 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700340 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700341 filtObjQueue.put(k, obj);
342 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700343 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700344 ForwardingObjQueueKey k =
345 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700346 fwdObjQueue.put(k, obj);
347 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700348 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700349 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
350 nextObjQueue.put(k, obj);
351 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700352 } else {
353 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
354 return;
355 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700356 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700357
358 // Execute immediately if there is no pending obj ahead
359 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700360 execute(deviceId, obj);
361 }
362 }
363
364 /**
365 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
366 *
367 * @param deviceId Device ID
368 * @param obj Flow objective
Charles Chan35b589c2018-04-26 16:00:23 -0400369 * @param error ObjectiveError that triggers this dequeue. Null if this is not triggered by an error.
Charles Chana7903c82018-03-15 20:14:16 -0700370 */
Charles Chan35b589c2018-04-26 16:00:23 -0400371 private synchronized void dequeue(DeviceId deviceId, Objective obj, ObjectiveError error) {
Charles Chana7903c82018-03-15 20:14:16 -0700372 List<Objective> remaining;
373 int priority = obj.priority();
374
Charles Chanc09ad6d2018-04-04 16:31:23 -0700375 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
376 Tools.log(log, logLevel, "Dequeue {}", obj);
377
Charles Chana7903c82018-03-15 20:14:16 -0700378 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700379 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana724c5132018-11-27 21:33:33 +0800380 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800381 filtObjQueueHead.invalidate(k);
382 }
Charles Chana7903c82018-03-15 20:14:16 -0700383 filtObjQueue.remove(k, obj);
384 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700385 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700386 ForwardingObjQueueKey k =
387 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana724c5132018-11-27 21:33:33 +0800388 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800389 fwdObjQueueHead.invalidate(k);
390 }
Charles Chana7903c82018-03-15 20:14:16 -0700391 fwdObjQueue.remove(k, obj);
392 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700393 } else if (obj instanceof NextObjective) {
Charles Chan35b589c2018-04-26 16:00:23 -0400394 if (error != null) {
395 // Remove pendingForwards and pendingNexts if next objective failed
396 Set<PendingFlowObjective> removedForwards = pendingForwards.remove(obj.id());
397 List<PendingFlowObjective> removedNexts = pendingNexts.remove(obj.id());
398
399 if (removedForwards != null) {
400 removedForwards.stream().map(PendingFlowObjective::flowObjective)
401 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
402 c.onError(pendingObj, error)));
403 }
404 if (removedNexts != null) {
405 removedNexts.stream().map(PendingFlowObjective::flowObjective)
406 .forEach(pendingObj -> pendingObj.context().ifPresent(c ->
407 c.onError(pendingObj, error)));
408 }
409 }
Charles Chana7903c82018-03-15 20:14:16 -0700410 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chana724c5132018-11-27 21:33:33 +0800411 if (!Objects.equals(ObjectiveError.INSTALLATIONTIMEOUT, error)) {
Charles Chan5cf77f52018-11-07 15:05:59 -0800412 nextObjQueueHead.invalidate(k);
413 }
Charles Chana7903c82018-03-15 20:14:16 -0700414 nextObjQueue.remove(k, obj);
415 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700416 } else {
417 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
418 return;
419 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700420 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700421
422 // Submit the next one in the queue, if any
423 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700424 execute(deviceId, remaining.get(0));
425 }
426 }
427
428 /**
429 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
430 * Therefore we must be certain that this method is called in-order.
431 *
432 * @param deviceId Device ID
433 * @param obj Flow objective
434 */
435 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700436 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
437 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
438
Charles Chan45c19d72018-04-19 21:38:40 -0700439 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700440 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700441 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700442 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700443 super.filter(deviceId, (FilteringObjective) obj);
444 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700445 ForwardingObjQueueKey k =
446 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700447 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700448 super.forward(deviceId, (ForwardingObjective) obj);
449 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700450 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
451 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700452 super.next(deviceId, (NextObjective) obj);
453 } else {
454 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
455 }
456 }
457
458 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
459 @Override
460 public void notify(ObjectiveEvent event) {
461 if (event.type() == ObjectiveEvent.Type.ADD) {
462 log.debug("Received notification of obj event {}", event);
463 Set<PendingFlowObjective> pending;
464
465 // first send all pending flows
466 synchronized (pendingForwards) {
467 // needs to be synchronized for queueObjective lookup
468 pending = pendingForwards.remove(event.subject());
469 }
470 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700471 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700472 } else {
473 log.debug("Processing {} pending forwarding objectives for nextId {}",
474 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700475 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700476 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
477 }
478
479 // now check for pending next-objectives
480 // Note: This is still necessary despite the existence of in-order execution.
481 // Since the in-order execution does not handle the case of
482 // ADD_TO_EXISTING coming before ADD
483 List<PendingFlowObjective> pendNexts;
484 synchronized (pendingNexts) {
485 // needs to be synchronized for queueObjective lookup
486 pendNexts = pendingNexts.remove(event.subject());
487 }
488 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700489 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700490 } else {
491 log.debug("Processing {} pending next objectives for nextId {}",
492 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700493 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700494 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
495 }
496 }
497 }
498 }
Charles Chanc51852e2019-01-07 19:59:10 -0800499
500 final class InOrderObjectiveContext implements ObjectiveContext {
501 private final DeviceId deviceId;
502 private final ObjectiveContext originalContext;
Charles Chan0a6330f2019-01-16 11:11:02 -0800503 // Prevent onSuccess from being executed after onError is called
504 // i.e. when the context actually succeed after the cache timeout
505 private final AtomicBoolean failed;
Charles Chanc51852e2019-01-07 19:59:10 -0800506
507 InOrderObjectiveContext(DeviceId deviceId, ObjectiveContext originalContext) {
508 this.deviceId = deviceId;
509 this.originalContext = originalContext;
Charles Chan0a6330f2019-01-16 11:11:02 -0800510 this.failed = new AtomicBoolean(false);
Charles Chanc51852e2019-01-07 19:59:10 -0800511 }
512
513 @Override
514 public void onSuccess(Objective objective) {
515 log.trace("Flow objective onSuccess {}", objective);
516
Charles Chan0a6330f2019-01-16 11:11:02 -0800517 if (!failed.get()) {
Charles Chanc51852e2019-01-07 19:59:10 -0800518 dequeue(deviceId, objective, null);
519 if (originalContext != null) {
520 originalContext.onSuccess(objective);
521 }
522 }
523
524 }
525 @Override
526 public void onError(Objective objective, ObjectiveError error) {
527 log.warn("Flow objective onError {}. Reason = {}", objective, error);
528
Charles Chan0a6330f2019-01-16 11:11:02 -0800529 if (!failed.getAndSet(true)) {
Charles Chanc51852e2019-01-07 19:59:10 -0800530 dequeue(deviceId, objective, error);
531 if (originalContext != null) {
532 originalContext.onError(objective, error);
533 }
534 }
535 }
536 }
Charles Chana7903c82018-03-15 20:14:16 -0700537}