blob: 5eb3ac09e2e2f66f6e45b0880c634b2c35f30991 [file] [log] [blame]
Charles Chana7903c82018-03-15 20:14:16 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.flowobjective.impl;
18
Charles Chan45c19d72018-04-19 21:38:40 -070019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalNotification;
Charles Chana7903c82018-03-15 20:14:16 -070022import com.google.common.collect.ArrayListMultimap;
23import com.google.common.collect.ListMultimap;
24import com.google.common.collect.Multimaps;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Service;
Charles Chanc09ad6d2018-04-04 16:31:23 -070029import org.onlab.util.Tools;
30import org.onlab.util.Tools.LogLevel;
Charles Chana7903c82018-03-15 20:14:16 -070031import org.onosproject.net.DeviceId;
Charles Chan33f4a912018-04-19 23:35:30 -070032import org.onosproject.net.flowobjective.FilteringObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070033import org.onosproject.net.flowobjective.FilteringObjective;
34import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
Charles Chan33f4a912018-04-19 23:35:30 -070035import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070036import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan33f4a912018-04-19 23:35:30 -070037import org.onosproject.net.flowobjective.NextObjQueueKey;
Charles Chana7903c82018-03-15 20:14:16 -070038import org.onosproject.net.flowobjective.NextObjective;
39import org.onosproject.net.flowobjective.Objective;
40import org.onosproject.net.flowobjective.ObjectiveContext;
41import org.onosproject.net.flowobjective.ObjectiveError;
42import org.onosproject.net.flowobjective.ObjectiveEvent;
43import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46import java.util.List;
Charles Chan33f4a912018-04-19 23:35:30 -070047import java.util.Map;
Charles Chana7903c82018-03-15 20:14:16 -070048import java.util.Optional;
49import java.util.Set;
Charles Chan45c19d72018-04-19 21:38:40 -070050import java.util.concurrent.ScheduledExecutorService;
51import java.util.concurrent.TimeUnit;
52
53import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
54import static org.onlab.util.Tools.groupedThreads;
Charles Chana7903c82018-03-15 20:14:16 -070055
56@Component(immediate = true, enabled = true)
57@Service
58public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
Charles Chan45c19d72018-04-19 21:38:40 -070061 // TODO Make queue timeout configurable
62 static final int OBJ_TIMEOUT_MS = 5000;
63
Charles Chan33f4a912018-04-19 23:35:30 -070064 private Cache<FilteringObjQueueKey, Objective> filtObjQueueHead;
65 private Cache<ForwardingObjQueueKey, Objective> fwdObjQueueHead;
Charles Chan45c19d72018-04-19 21:38:40 -070066 private Cache<NextObjQueueKey, Objective> nextObjQueueHead;
67 private ScheduledExecutorService cacheCleaner;
68
Charles Chan33f4a912018-04-19 23:35:30 -070069 private ListMultimap<FilteringObjQueueKey, Objective> filtObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070070 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
Charles Chan33f4a912018-04-19 23:35:30 -070071 private ListMultimap<ForwardingObjQueueKey, Objective> fwdObjQueue =
Charles Chana7903c82018-03-15 20:14:16 -070072 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
73 private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
74 Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
75
76 final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
77
78 @Activate
79 protected void activate() {
80 super.activate();
Charles Chan45c19d72018-04-19 21:38:40 -070081
82 // TODO Clean up duplicated code
83 filtObjQueueHead = CacheBuilder.newBuilder()
84 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
Charles Chan33f4a912018-04-19 23:35:30 -070085 .removalListener((RemovalNotification<FilteringObjQueueKey, Objective> notification) -> {
Charles Chan45c19d72018-04-19 21:38:40 -070086 Objective obj = notification.getValue();
87 switch (notification.getCause()) {
88 case EXPIRED:
89 case COLLECTED:
90 case SIZE:
91 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
92 break;
93 case EXPLICIT: // No action when the objective completes correctly
94 case REPLACED: // No action when a pending forward or next objective gets executed
95 default:
96 break;
97 }
98 }).build();
99 fwdObjQueueHead = CacheBuilder.newBuilder()
100 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
Charles Chan33f4a912018-04-19 23:35:30 -0700101 .removalListener((RemovalNotification<ForwardingObjQueueKey, Objective> notification) -> {
Charles Chan45c19d72018-04-19 21:38:40 -0700102 Objective obj = notification.getValue();
103 switch (notification.getCause()) {
104 case EXPIRED:
105 case COLLECTED:
106 case SIZE:
107 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
108 break;
109 case EXPLICIT: // No action when the objective completes correctly
110 case REPLACED: // No action when a pending forward or next objective gets executed
111 default:
112 break;
113 }
114 }).build();
115 nextObjQueueHead = CacheBuilder.newBuilder()
116 .expireAfterWrite(OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
117 .removalListener((RemovalNotification<NextObjQueueKey, Objective> notification) -> {
118 Objective obj = notification.getValue();
119 switch (notification.getCause()) {
120 case EXPIRED:
121 case COLLECTED:
122 case SIZE:
123 obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
124 break;
125 case EXPLICIT: // No action when the objective completes correctly
126 case REPLACED: // No action when a pending forward or next objective gets executed
127 default:
128 break;
129 }
130 }).build();
131
132 cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
133 cacheCleaner.scheduleAtFixedRate(() -> {
134 filtObjQueueHead.cleanUp();
135 fwdObjQueueHead.cleanUp();
136 nextObjQueueHead.cleanUp();
137 }, 0, OBJ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
138
Charles Chana7903c82018-03-15 20:14:16 -0700139 // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
Charles Chan45c19d72018-04-19 21:38:40 -0700140 // execute()
Charles Chana7903c82018-03-15 20:14:16 -0700141 flowObjectiveStore.unsetDelegate(super.delegate);
142 flowObjectiveStore.setDelegate(delegate);
143 }
144
145 @Deactivate
146 protected void deactivate() {
Charles Chan45c19d72018-04-19 21:38:40 -0700147 cacheCleaner.shutdown();
Charles Chan33f4a912018-04-19 23:35:30 -0700148 clearQueue();
Charles Chan45c19d72018-04-19 21:38:40 -0700149
Charles Chana7903c82018-03-15 20:14:16 -0700150 super.deactivate();
151 }
152
153 /**
154 * Processes given objective on given device.
155 * Objectives submitted through this method are guaranteed to be executed in order.
156 *
157 * @param deviceId Device ID
158 * @param originalObjective Flow objective to be executed
159 */
160 private void process(DeviceId deviceId, Objective originalObjective) {
161 // Inject ObjectiveContext such that we can get notified when it is completed
162 Objective.Builder objBuilder = originalObjective.copy();
163 Optional<ObjectiveContext> originalContext = originalObjective.context();
164 ObjectiveContext context = new ObjectiveContext() {
165 @Override
166 public void onSuccess(Objective objective) {
167 log.trace("Flow objective onSuccess {}", objective);
168 dequeue(deviceId, objective);
169 originalContext.ifPresent(c -> c.onSuccess(objective));
170 }
171 @Override
172 public void onError(Objective objective, ObjectiveError error) {
Charles Chan45c19d72018-04-19 21:38:40 -0700173 log.warn("Flow objective onError {}", objective);
Charles Chana7903c82018-03-15 20:14:16 -0700174 dequeue(deviceId, objective);
175 originalContext.ifPresent(c -> c.onError(objective, error));
176 }
177 };
178
179 // Preserve Objective.Operation
180 Objective objective;
181 switch (originalObjective.op()) {
182 case ADD:
183 objective = objBuilder.add(context);
184 break;
185 case ADD_TO_EXISTING:
186 objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
187 break;
188 case REMOVE:
189 objective = objBuilder.remove(context);
190 break;
191 case REMOVE_FROM_EXISTING:
192 objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
193 break;
194 case MODIFY:
195 objective = ((NextObjective.Builder) objBuilder).modify(context);
196 break;
197 case VERIFY:
198 objective = ((NextObjective.Builder) objBuilder).verify(context);
199 break;
200 default:
201 log.error("Unknown flow objecitve operation {}", originalObjective.op());
202 return;
203 }
204
205 enqueue(deviceId, objective);
206 }
207
208 @Override
209 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
210 process(deviceId, filteringObjective);
211 }
212
213 @Override
214 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
215 process(deviceId, forwardingObjective);
216 }
217
218 @Override
219 public void next(DeviceId deviceId, NextObjective nextObjective) {
220 process(deviceId, nextObjective);
221 }
222
Charles Chan33f4a912018-04-19 23:35:30 -0700223 @Override
224 public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
225 return filtObjQueue;
226 }
227
228 @Override
229 public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
230 return fwdObjQueue;
231 }
232
233 @Override
234 public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
235 return nextObjQueue;
236 }
237
238 @Override
239 public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
240 return filtObjQueueHead.asMap();
241 }
242
243 @Override
244 public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
245 return fwdObjQueueHead.asMap();
246 }
247
248 @Override
249 public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
250 return nextObjQueueHead.asMap();
251 }
252
253 @Override
254 public void clearQueue() {
255 filtObjQueueHead.invalidateAll();
256 fwdObjQueueHead.invalidateAll();
257 nextObjQueueHead.invalidateAll();
258
259 filtObjQueueHead.cleanUp();
260 fwdObjQueueHead.cleanUp();
261 nextObjQueueHead.cleanUp();
262
263 filtObjQueue.clear();
264 fwdObjQueue.clear();
265 nextObjQueue.clear();
266 }
267
Charles Chana7903c82018-03-15 20:14:16 -0700268 /**
269 * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
270 *
271 * @param deviceId Device ID
272 * @param obj Flow objective
273 */
274 private synchronized void enqueue(DeviceId deviceId, Objective obj) {
275 int queueSize;
276 int priority = obj.priority();
277
Charles Chanc09ad6d2018-04-04 16:31:23 -0700278 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
279 Tools.log(log, logLevel, "Enqueue {}", obj);
280
Charles Chana7903c82018-03-15 20:14:16 -0700281 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700282 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chana7903c82018-03-15 20:14:16 -0700283 filtObjQueue.put(k, obj);
284 queueSize = filtObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700285 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700286 ForwardingObjQueueKey k =
287 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chana7903c82018-03-15 20:14:16 -0700288 fwdObjQueue.put(k, obj);
289 queueSize = fwdObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700290 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700291 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
292 nextObjQueue.put(k, obj);
293 queueSize = nextObjQueue.get(k).size();
Charles Chana7903c82018-03-15 20:14:16 -0700294 } else {
295 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
296 return;
297 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700298 log.trace("{} queue size {}", obj.getClass().getSimpleName(), queueSize);
Charles Chana7903c82018-03-15 20:14:16 -0700299
300 // Execute immediately if there is no pending obj ahead
301 if (queueSize == 1) {
Charles Chana7903c82018-03-15 20:14:16 -0700302 execute(deviceId, obj);
303 }
304 }
305
306 /**
307 * Dequeue flow objective. Execute the next flow objective in the queue, if any.
308 *
309 * @param deviceId Device ID
310 * @param obj Flow objective
311 */
312 private synchronized void dequeue(DeviceId deviceId, Objective obj) {
313 List<Objective> remaining;
314 int priority = obj.priority();
315
Charles Chanc09ad6d2018-04-04 16:31:23 -0700316 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
317 Tools.log(log, logLevel, "Dequeue {}", obj);
318
Charles Chana7903c82018-03-15 20:14:16 -0700319 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700320 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700321 filtObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700322 filtObjQueue.remove(k, obj);
323 remaining = filtObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700324 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700325 ForwardingObjQueueKey k =
326 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700327 fwdObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700328 fwdObjQueue.remove(k, obj);
329 remaining = fwdObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700330 } else if (obj instanceof NextObjective) {
Charles Chana7903c82018-03-15 20:14:16 -0700331 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
Charles Chan45c19d72018-04-19 21:38:40 -0700332 nextObjQueueHead.invalidate(k);
Charles Chana7903c82018-03-15 20:14:16 -0700333 nextObjQueue.remove(k, obj);
334 remaining = nextObjQueue.get(k);
Charles Chana7903c82018-03-15 20:14:16 -0700335 } else {
336 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
337 return;
338 }
Charles Chanc09ad6d2018-04-04 16:31:23 -0700339 log.trace("{} queue size {}", obj.getClass().getSimpleName(), remaining.size());
Charles Chana7903c82018-03-15 20:14:16 -0700340
341 // Submit the next one in the queue, if any
342 if (remaining.size() > 0) {
Charles Chana7903c82018-03-15 20:14:16 -0700343 execute(deviceId, remaining.get(0));
344 }
345 }
346
347 /**
348 * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
349 * Therefore we must be certain that this method is called in-order.
350 *
351 * @param deviceId Device ID
352 * @param obj Flow objective
353 */
354 private void execute(DeviceId deviceId, Objective obj) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700355 LogLevel logLevel = (obj.op() == Objective.Operation.VERIFY) ? LogLevel.TRACE : LogLevel.DEBUG;
356 Tools.log(log, logLevel, "Submit objective installer, deviceId {}, obj {}", deviceId, obj);
357
Charles Chan45c19d72018-04-19 21:38:40 -0700358 int priority = obj.priority();
Charles Chana7903c82018-03-15 20:14:16 -0700359 if (obj instanceof FilteringObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700360 FilteringObjQueueKey k = new FilteringObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
Charles Chan45c19d72018-04-19 21:38:40 -0700361 filtObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700362 super.filter(deviceId, (FilteringObjective) obj);
363 } else if (obj instanceof ForwardingObjective) {
Charles Chan33f4a912018-04-19 23:35:30 -0700364 ForwardingObjQueueKey k =
365 new ForwardingObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
Charles Chan45c19d72018-04-19 21:38:40 -0700366 fwdObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700367 super.forward(deviceId, (ForwardingObjective) obj);
368 } else if (obj instanceof NextObjective) {
Charles Chan45c19d72018-04-19 21:38:40 -0700369 NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
370 nextObjQueueHead.put(k, obj);
Charles Chana7903c82018-03-15 20:14:16 -0700371 super.next(deviceId, (NextObjective) obj);
372 } else {
373 log.error("Unknown flow objective instance: {}", obj.getClass().getName());
374 }
375 }
376
377 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
378 @Override
379 public void notify(ObjectiveEvent event) {
380 if (event.type() == ObjectiveEvent.Type.ADD) {
381 log.debug("Received notification of obj event {}", event);
382 Set<PendingFlowObjective> pending;
383
384 // first send all pending flows
385 synchronized (pendingForwards) {
386 // needs to be synchronized for queueObjective lookup
387 pending = pendingForwards.remove(event.subject());
388 }
389 if (pending == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700390 log.debug("No forwarding objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700391 } else {
392 log.debug("Processing {} pending forwarding objectives for nextId {}",
393 pending.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700394 // execute pending forwards one by one
Charles Chana7903c82018-03-15 20:14:16 -0700395 pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
396 }
397
398 // now check for pending next-objectives
399 // Note: This is still necessary despite the existence of in-order execution.
400 // Since the in-order execution does not handle the case of
401 // ADD_TO_EXISTING coming before ADD
402 List<PendingFlowObjective> pendNexts;
403 synchronized (pendingNexts) {
404 // needs to be synchronized for queueObjective lookup
405 pendNexts = pendingNexts.remove(event.subject());
406 }
407 if (pendNexts == null) {
Charles Chanc09ad6d2018-04-04 16:31:23 -0700408 log.debug("No next objectives pending for this obj event {}", event);
Charles Chana7903c82018-03-15 20:14:16 -0700409 } else {
410 log.debug("Processing {} pending next objectives for nextId {}",
411 pendNexts.size(), event.subject());
Charles Chan45c19d72018-04-19 21:38:40 -0700412 // execute pending nexts one by one
Charles Chana7903c82018-03-15 20:14:16 -0700413 pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
414 }
415 }
416 }
417 }
Charles Chana7903c82018-03-15 20:14:16 -0700418}