blob: 3ef98bd874d21043adaba458fcaf2accba67d308 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.osgi.DefaultServiceDirectory;
27import org.onlab.osgi.ServiceDirectory;
28import org.onlab.util.ItemNotFoundException;
29import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070030import org.onosproject.mastership.MastershipEvent;
31import org.onosproject.mastership.MastershipListener;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.behaviour.Pipeliner;
35import org.onosproject.net.behaviour.PipelinerContext;
36import org.onosproject.net.device.DeviceEvent;
37import org.onosproject.net.device.DeviceListener;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.driver.DefaultDriverProviderService;
40import org.onosproject.net.driver.DriverHandler;
41import org.onosproject.net.driver.DriverService;
42import org.onosproject.net.flow.FlowRuleService;
43import org.onosproject.net.flow.criteria.Criterion;
44import org.onosproject.net.flow.instructions.Instruction;
45import org.onosproject.net.flowobjective.FilteringObjective;
46import org.onosproject.net.flowobjective.FlowObjectiveService;
47import org.onosproject.net.flowobjective.FlowObjectiveStore;
48import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
51import org.onosproject.net.flowobjective.Objective;
52import org.onosproject.net.flowobjective.ObjectiveError;
53import org.onosproject.net.flowobjective.ObjectiveEvent;
54import org.onosproject.net.group.GroupService;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ExecutorService;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static java.util.concurrent.Executors.newFixedThreadPool;
65import static org.onlab.util.Tools.groupedThreads;
66import static org.onosproject.security.AppGuard.checkPermission;
Changhoon Yoonb856b812015-08-10 03:47:19 +090067import static org.onosproject.security.AppPermission.Type.*;
Xin Jin313708b2015-07-09 13:43:04 -070068
69
70/**
71 * Provides implementation of the flow objective programming service with composition feature.
72 *
73 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
74 * that supports composition. It can be enabled by setting the enable flag below to true,
75 * and you should also add "enabled = false" to the FlowObjectiveManager.
76 *
77 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
78 * so it will not have high availability and may break if device mastership changes.
79 * Therefore, it is safest to use this component in a single instance scenario.
80 * This comment will be removed when a distributed implementation is available.
81 */
82@Component(immediate = true, enabled = false)
83@Service
84public class FlowObjectiveCompositionManager implements FlowObjectiveService {
85
86 public enum PolicyOperator {
87 Parallel,
88 Sequential,
89 Override,
90 Application
91 }
92
93 public static final int INSTALL_RETRY_ATTEMPTS = 5;
94 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
95
96 private final Logger log = LoggerFactory.getLogger(getClass());
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected DriverService driverService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected DeviceService deviceService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 // Note: The following dependencies are added on behalf of the pipeline
111 // driver behaviours to assure these services are available for their
112 // initialization.
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected FlowRuleService flowRuleService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected GroupService groupService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected FlowObjectiveStore flowObjectiveStore;
121
122 // Note: This must remain an optional dependency to allow re-install of default drivers.
123 // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
124 // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DefaultDriverProviderService defaultDriverService;
127
128 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
129
130 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
131 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
132
133 private final PipelinerContext context = new InnerPipelineContext();
134 private final MastershipListener mastershipListener = new InnerMastershipListener();
135 private final DeviceListener deviceListener = new InnerDeviceListener();
136
137 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
138
139 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
140
141 private ExecutorService executorService;
142
143 private String policy;
144 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
145
146 @Activate
147 protected void activate() {
148 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
149 flowObjectiveStore.setDelegate(delegate);
150 mastershipService.addListener(mastershipListener);
151 deviceService.addListener(deviceListener);
152 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
153 deviceCompositionTreeMap = Maps.newConcurrentMap();
154 log.info("Started");
155 }
156
157 @Deactivate
158 protected void deactivate() {
159 flowObjectiveStore.unsetDelegate(delegate);
160 mastershipService.removeListener(mastershipListener);
161 deviceService.removeListener(deviceListener);
162 executorService.shutdown();
163 pipeliners.clear();
164 driverHandlers.clear();
165 deviceCompositionTreeMap.clear();
166 log.info("Stopped");
167 }
168
169 /**
170 * Task that passes the flow objective down to the driver. The task will
171 * make a few attempts to find the appropriate driver, then eventually give
172 * up and report an error if no suitable driver could be found.
173 */
174 private class ObjectiveInstaller implements Runnable {
175 private final DeviceId deviceId;
176 private final Objective objective;
177
178 private final int numAttempts;
179
180 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
181 this(deviceId, objective, 1);
182 }
183
184 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
185 this.deviceId = checkNotNull(deviceId);
186 this.objective = checkNotNull(objective);
187 this.numAttempts = checkNotNull(attemps);
188 }
189
190 @Override
191 public void run() {
192 try {
193 Pipeliner pipeliner = getDevicePipeliner(deviceId);
194
195 if (pipeliner != null) {
196 if (objective instanceof NextObjective) {
197 pipeliner.next((NextObjective) objective);
198 } else if (objective instanceof ForwardingObjective) {
199 pipeliner.forward((ForwardingObjective) objective);
200 } else {
201 pipeliner.filter((FilteringObjective) objective);
202 }
203 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
204 Thread.sleep(INSTALL_RETRY_INTERVAL);
205 executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
206 } else {
207 // Otherwise we've tried a few times and failed, report an
208 // error back to the user.
209 objective.context().ifPresent(
210 c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
211 }
212 } catch (Exception e) {
213 log.warn("Exception while installing flow objective", e);
214 }
215 }
216 }
217
218 @Override
219 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900220 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700221
222 List<FilteringObjective> filteringObjectives
223 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
224 for (FilteringObjective tmp : filteringObjectives) {
225 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
226 }
227 }
228
229 @Override
230 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900231 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700232
233 if (queueObjective(deviceId, forwardingObjective)) {
234 return;
235 }
236 List<ForwardingObjective> forwardingObjectives
237 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
238 for (ForwardingObjective tmp : forwardingObjectives) {
239 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
240 }
241 }
242
243 @Override
244 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900245 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700246
247 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
248 for (NextObjective tmp : nextObjectives) {
249 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
250 }
251 }
252
253 @Override
254 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900255 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700256
257 return flowObjectiveStore.allocateNextId();
258 }
259
260 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
261 if (fwd.nextId() != null &&
262 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
263 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
264 if (pendingForwards.putIfAbsent(fwd.nextId(),
265 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
266 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
267 pending.add(new PendingNext(deviceId, fwd));
268 }
269 return true;
270 }
271 return false;
272 }
273
274 @Override
275 public void initPolicy(String policy) {
276 this.policy = policy;
277 deviceService.getDevices().forEach(device ->
278 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
279 log.info("Initialize policy {}", policy);
280 }
281
282 // Retrieves the device pipeline behaviour from the cache.
283 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
284 Pipeliner pipeliner = pipeliners.get(deviceId);
285 return pipeliner;
286 }
287
288 private void setupPipelineHandler(DeviceId deviceId) {
289 if (defaultDriverService == null) {
290 // We're not ready to go to work yet.
291 return;
292 }
293
294 // Attempt to lookup the handler in the cache
295 DriverHandler handler = driverHandlers.get(deviceId);
296 if (handler == null) {
297 try {
298 // Otherwise create it and if it has pipeline behaviour, cache it
299 handler = driverService.createHandler(deviceId);
300 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
301 log.warn("Pipeline behaviour not supported for device {}",
302 deviceId);
303 return;
304 }
305 } catch (ItemNotFoundException e) {
306 log.warn("No applicable driver for device {}", deviceId);
307 return;
308 }
309
310 driverHandlers.put(deviceId, handler);
311 }
312
313 // Always (re)initialize the pipeline behaviour
314 log.info("Driver {} bound to device {} ... initializing driver",
315 handler.driver().name(), deviceId);
316 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
317 pipeliner.init(deviceId, context);
318 pipeliners.putIfAbsent(deviceId, pipeliner);
319 }
320
321 // Triggers driver setup when the local node becomes a device master.
322 private class InnerMastershipListener implements MastershipListener {
323 @Override
324 public void event(MastershipEvent event) {
325 switch (event.type()) {
326 case MASTER_CHANGED:
327 log.debug("mastership changed on device {}", event.subject());
328 if (deviceService.isAvailable(event.subject())) {
329 setupPipelineHandler(event.subject());
330 }
331 break;
332 case BACKUPS_CHANGED:
333 break;
334 default:
335 break;
336 }
337 }
338 }
339
340 // Triggers driver setup when a device is (re)detected.
341 private class InnerDeviceListener implements DeviceListener {
342 @Override
343 public void event(DeviceEvent event) {
344 switch (event.type()) {
345 case DEVICE_ADDED:
346 case DEVICE_AVAILABILITY_CHANGED:
347 log.debug("Device either added or availability changed {}",
348 event.subject().id());
349 if (deviceService.isAvailable(event.subject().id())) {
350 log.debug("Device is now available {}", event.subject().id());
351 setupPipelineHandler(event.subject().id());
352 }
353 break;
354 case DEVICE_UPDATED:
355 break;
356 case DEVICE_REMOVED:
357 break;
358 case DEVICE_SUSPENDED:
359 break;
360 case PORT_ADDED:
361 break;
362 case PORT_UPDATED:
363 break;
364 case PORT_REMOVED:
365 break;
366 default:
367 break;
368 }
369 }
370 }
371
372 // Processing context for initializing pipeline driver behaviours.
373 private class InnerPipelineContext implements PipelinerContext {
374 @Override
375 public ServiceDirectory directory() {
376 return serviceDirectory;
377 }
378
379 @Override
380 public FlowObjectiveStore store() {
381 return flowObjectiveStore;
382 }
383 }
384
385 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
386 @Override
387 public void notify(ObjectiveEvent event) {
388 log.debug("Received notification of obj event {}", event);
389 Set<PendingNext> pending = pendingForwards.remove(event.subject());
390
391 if (pending == null) {
392 log.debug("Nothing pending for this obj event");
393 return;
394 }
395
396 log.debug("Processing pending forwarding objectives {}", pending.size());
397
398 pending.forEach(p -> getDevicePipeliner(p.deviceId())
399 .forward(p.forwardingObjective()));
400
401 }
402 }
403
404 /**
405 * Data class used to hold a pending forwarding objective that could not
406 * be processed because the associated next object was not present.
407 */
408 private class PendingNext {
409 private final DeviceId deviceId;
410 private final ForwardingObjective fwd;
411
412 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
413 this.deviceId = deviceId;
414 this.fwd = fwd;
415 }
416
417 public DeviceId deviceId() {
418 return deviceId;
419 }
420
421 public ForwardingObjective forwardingObjective() {
422 return fwd;
423 }
424 }
425
426 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
427 String str = forwardingObjective.priority() + " ";
428 str += "selector( ";
429 for (Criterion criterion : forwardingObjective.selector().criteria()) {
430 str += criterion + " ";
431 }
432 str += ") treatment( ";
433 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
434 str += instruction + " ";
435 }
436 str += ")";
437 return str;
438 }
439}