blob: 2041b5b919166f90a9f54609ae48054a56f9fcea [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import org.apache.felix.scr.annotations.Activate;
Xin Jin313708b2015-07-09 13:43:04 -070021import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.osgi.DefaultServiceDirectory;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.util.ItemNotFoundException;
28import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070029import org.onosproject.mastership.MastershipEvent;
30import org.onosproject.mastership.MastershipListener;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceEvent;
36import org.onosproject.net.device.DeviceListener;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.driver.DefaultDriverProviderService;
39import org.onosproject.net.driver.DriverHandler;
40import org.onosproject.net.driver.DriverService;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.criteria.Criterion;
43import org.onosproject.net.flow.instructions.Instruction;
44import org.onosproject.net.flowobjective.FilteringObjective;
45import org.onosproject.net.flowobjective.FlowObjectiveService;
46import org.onosproject.net.flowobjective.FlowObjectiveStore;
47import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
48import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
50import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveError;
52import org.onosproject.net.flowobjective.ObjectiveEvent;
53import org.onosproject.net.group.GroupService;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static com.google.common.base.Preconditions.checkNotNull;
63import static java.util.concurrent.Executors.newFixedThreadPool;
64import static org.onlab.util.Tools.groupedThreads;
65import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080066import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070067
68
69/**
70 * Provides implementation of the flow objective programming service with composition feature.
71 *
72 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
73 * that supports composition. It can be enabled by setting the enable flag below to true,
74 * and you should also add "enabled = false" to the FlowObjectiveManager.
75 *
76 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
77 * so it will not have high availability and may break if device mastership changes.
78 * Therefore, it is safest to use this component in a single instance scenario.
79 * This comment will be removed when a distributed implementation is available.
80 */
Jian Li11599162016-01-15 15:46:16 -080081//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070082@Service
83public class FlowObjectiveCompositionManager implements FlowObjectiveService {
84
85 public enum PolicyOperator {
86 Parallel,
87 Sequential,
88 Override,
89 Application
90 }
91
92 public static final int INSTALL_RETRY_ATTEMPTS = 5;
93 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
94
95 private final Logger log = LoggerFactory.getLogger(getClass());
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected DriverService driverService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected DeviceService deviceService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected MastershipService mastershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 // Note: The following dependencies are added on behalf of the pipeline
110 // driver behaviours to assure these services are available for their
111 // initialization.
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected FlowRuleService flowRuleService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected GroupService groupService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected FlowObjectiveStore flowObjectiveStore;
120
121 // Note: This must remain an optional dependency to allow re-install of default drivers.
122 // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
123 // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DefaultDriverProviderService defaultDriverService;
126
127 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
128
129 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
130 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
131
132 private final PipelinerContext context = new InnerPipelineContext();
133 private final MastershipListener mastershipListener = new InnerMastershipListener();
134 private final DeviceListener deviceListener = new InnerDeviceListener();
135
136 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
137
138 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
139
140 private ExecutorService executorService;
141
142 private String policy;
143 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
144
145 @Activate
146 protected void activate() {
147 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
148 flowObjectiveStore.setDelegate(delegate);
149 mastershipService.addListener(mastershipListener);
150 deviceService.addListener(deviceListener);
151 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
152 deviceCompositionTreeMap = Maps.newConcurrentMap();
153 log.info("Started");
154 }
155
156 @Deactivate
157 protected void deactivate() {
158 flowObjectiveStore.unsetDelegate(delegate);
159 mastershipService.removeListener(mastershipListener);
160 deviceService.removeListener(deviceListener);
161 executorService.shutdown();
162 pipeliners.clear();
163 driverHandlers.clear();
164 deviceCompositionTreeMap.clear();
165 log.info("Stopped");
166 }
167
168 /**
169 * Task that passes the flow objective down to the driver. The task will
170 * make a few attempts to find the appropriate driver, then eventually give
171 * up and report an error if no suitable driver could be found.
172 */
173 private class ObjectiveInstaller implements Runnable {
174 private final DeviceId deviceId;
175 private final Objective objective;
176
177 private final int numAttempts;
178
179 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
180 this(deviceId, objective, 1);
181 }
182
183 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
184 this.deviceId = checkNotNull(deviceId);
185 this.objective = checkNotNull(objective);
186 this.numAttempts = checkNotNull(attemps);
187 }
188
189 @Override
190 public void run() {
191 try {
192 Pipeliner pipeliner = getDevicePipeliner(deviceId);
193
194 if (pipeliner != null) {
195 if (objective instanceof NextObjective) {
196 pipeliner.next((NextObjective) objective);
197 } else if (objective instanceof ForwardingObjective) {
198 pipeliner.forward((ForwardingObjective) objective);
199 } else {
200 pipeliner.filter((FilteringObjective) objective);
201 }
202 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
203 Thread.sleep(INSTALL_RETRY_INTERVAL);
204 executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
205 } else {
206 // Otherwise we've tried a few times and failed, report an
207 // error back to the user.
208 objective.context().ifPresent(
Andrea Campanella1f8188d2016-02-29 13:24:54 -0800209 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
Xin Jin313708b2015-07-09 13:43:04 -0700210 }
211 } catch (Exception e) {
212 log.warn("Exception while installing flow objective", e);
213 }
214 }
215 }
216
217 @Override
218 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900219 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700220
221 List<FilteringObjective> filteringObjectives
222 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
223 for (FilteringObjective tmp : filteringObjectives) {
224 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
225 }
226 }
227
228 @Override
229 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900230 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700231
232 if (queueObjective(deviceId, forwardingObjective)) {
233 return;
234 }
235 List<ForwardingObjective> forwardingObjectives
236 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
237 for (ForwardingObjective tmp : forwardingObjectives) {
238 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
239 }
240 }
241
242 @Override
243 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900244 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700245
246 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
247 for (NextObjective tmp : nextObjectives) {
248 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
249 }
250 }
251
252 @Override
253 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900254 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700255
256 return flowObjectiveStore.allocateNextId();
257 }
258
259 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
260 if (fwd.nextId() != null &&
261 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
262 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
263 if (pendingForwards.putIfAbsent(fwd.nextId(),
264 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
265 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
266 pending.add(new PendingNext(deviceId, fwd));
267 }
268 return true;
269 }
270 return false;
271 }
272
273 @Override
274 public void initPolicy(String policy) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900275 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700276 this.policy = policy;
277 deviceService.getDevices().forEach(device ->
278 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
279 log.info("Initialize policy {}", policy);
280 }
281
282 // Retrieves the device pipeline behaviour from the cache.
283 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
284 Pipeliner pipeliner = pipeliners.get(deviceId);
285 return pipeliner;
286 }
287
288 private void setupPipelineHandler(DeviceId deviceId) {
289 if (defaultDriverService == null) {
290 // We're not ready to go to work yet.
291 return;
292 }
293
294 // Attempt to lookup the handler in the cache
295 DriverHandler handler = driverHandlers.get(deviceId);
296 if (handler == null) {
297 try {
298 // Otherwise create it and if it has pipeline behaviour, cache it
299 handler = driverService.createHandler(deviceId);
300 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
301 log.warn("Pipeline behaviour not supported for device {}",
302 deviceId);
303 return;
304 }
305 } catch (ItemNotFoundException e) {
306 log.warn("No applicable driver for device {}", deviceId);
307 return;
308 }
309
310 driverHandlers.put(deviceId, handler);
311 }
312
313 // Always (re)initialize the pipeline behaviour
314 log.info("Driver {} bound to device {} ... initializing driver",
315 handler.driver().name(), deviceId);
316 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
317 pipeliner.init(deviceId, context);
318 pipeliners.putIfAbsent(deviceId, pipeliner);
319 }
320
321 // Triggers driver setup when the local node becomes a device master.
322 private class InnerMastershipListener implements MastershipListener {
323 @Override
324 public void event(MastershipEvent event) {
325 switch (event.type()) {
326 case MASTER_CHANGED:
327 log.debug("mastership changed on device {}", event.subject());
328 if (deviceService.isAvailable(event.subject())) {
329 setupPipelineHandler(event.subject());
330 }
331 break;
332 case BACKUPS_CHANGED:
333 break;
334 default:
335 break;
336 }
337 }
338 }
339
340 // Triggers driver setup when a device is (re)detected.
341 private class InnerDeviceListener implements DeviceListener {
342 @Override
343 public void event(DeviceEvent event) {
344 switch (event.type()) {
345 case DEVICE_ADDED:
346 case DEVICE_AVAILABILITY_CHANGED:
347 log.debug("Device either added or availability changed {}",
348 event.subject().id());
349 if (deviceService.isAvailable(event.subject().id())) {
350 log.debug("Device is now available {}", event.subject().id());
351 setupPipelineHandler(event.subject().id());
352 }
353 break;
354 case DEVICE_UPDATED:
355 break;
356 case DEVICE_REMOVED:
357 break;
358 case DEVICE_SUSPENDED:
359 break;
360 case PORT_ADDED:
361 break;
362 case PORT_UPDATED:
363 break;
364 case PORT_REMOVED:
365 break;
366 default:
367 break;
368 }
369 }
370 }
371
372 // Processing context for initializing pipeline driver behaviours.
373 private class InnerPipelineContext implements PipelinerContext {
374 @Override
375 public ServiceDirectory directory() {
376 return serviceDirectory;
377 }
378
379 @Override
380 public FlowObjectiveStore store() {
381 return flowObjectiveStore;
382 }
383 }
384
385 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
386 @Override
387 public void notify(ObjectiveEvent event) {
388 log.debug("Received notification of obj event {}", event);
389 Set<PendingNext> pending = pendingForwards.remove(event.subject());
390
391 if (pending == null) {
392 log.debug("Nothing pending for this obj event");
393 return;
394 }
395
396 log.debug("Processing pending forwarding objectives {}", pending.size());
397
398 pending.forEach(p -> getDevicePipeliner(p.deviceId())
399 .forward(p.forwardingObjective()));
400
401 }
402 }
403
404 /**
405 * Data class used to hold a pending forwarding objective that could not
406 * be processed because the associated next object was not present.
407 */
408 private class PendingNext {
409 private final DeviceId deviceId;
410 private final ForwardingObjective fwd;
411
412 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
413 this.deviceId = deviceId;
414 this.fwd = fwd;
415 }
416
417 public DeviceId deviceId() {
418 return deviceId;
419 }
420
421 public ForwardingObjective forwardingObjective() {
422 return fwd;
423 }
424 }
425
426 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
427 String str = forwardingObjective.priority() + " ";
428 str += "selector( ";
429 for (Criterion criterion : forwardingObjective.selector().criteria()) {
430 str += criterion + " ";
431 }
432 str += ") treatment( ";
433 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
434 str += instruction + " ";
435 }
436 str += ")";
437 return str;
438 }
439}