blob: ed8f4e441771dd4372f3bb097b40ca95200679f8 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
18import com.google.common.collect.Maps;
19import com.google.common.collect.Sets;
20import org.apache.felix.scr.annotations.Activate;
Xin Jin313708b2015-07-09 13:43:04 -070021import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.osgi.DefaultServiceDirectory;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.util.ItemNotFoundException;
28import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070029import org.onosproject.mastership.MastershipEvent;
30import org.onosproject.mastership.MastershipListener;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.behaviour.Pipeliner;
34import org.onosproject.net.behaviour.PipelinerContext;
35import org.onosproject.net.device.DeviceEvent;
36import org.onosproject.net.device.DeviceListener;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.driver.DefaultDriverProviderService;
39import org.onosproject.net.driver.DriverHandler;
40import org.onosproject.net.driver.DriverService;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.criteria.Criterion;
43import org.onosproject.net.flow.instructions.Instruction;
44import org.onosproject.net.flowobjective.FilteringObjective;
45import org.onosproject.net.flowobjective.FlowObjectiveService;
46import org.onosproject.net.flowobjective.FlowObjectiveStore;
47import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
48import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
50import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveError;
52import org.onosproject.net.flowobjective.ObjectiveEvent;
53import org.onosproject.net.group.GroupService;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static com.google.common.base.Preconditions.checkNotNull;
63import static java.util.concurrent.Executors.newFixedThreadPool;
64import static org.onlab.util.Tools.groupedThreads;
65import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080066import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070067
68
69/**
70 * Provides implementation of the flow objective programming service with composition feature.
71 *
72 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
73 * that supports composition. It can be enabled by setting the enable flag below to true,
74 * and you should also add "enabled = false" to the FlowObjectiveManager.
75 *
76 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
77 * so it will not have high availability and may break if device mastership changes.
78 * Therefore, it is safest to use this component in a single instance scenario.
79 * This comment will be removed when a distributed implementation is available.
80 */
Jian Li11599162016-01-15 15:46:16 -080081//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070082@Service
83public class FlowObjectiveCompositionManager implements FlowObjectiveService {
84
85 public enum PolicyOperator {
86 Parallel,
87 Sequential,
88 Override,
89 Application
90 }
91
92 public static final int INSTALL_RETRY_ATTEMPTS = 5;
93 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
94
95 private final Logger log = LoggerFactory.getLogger(getClass());
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected DriverService driverService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected DeviceService deviceService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected MastershipService mastershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 // Note: The following dependencies are added on behalf of the pipeline
110 // driver behaviours to assure these services are available for their
111 // initialization.
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected FlowRuleService flowRuleService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected GroupService groupService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected FlowObjectiveStore flowObjectiveStore;
120
121 // Note: This must remain an optional dependency to allow re-install of default drivers.
122 // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
123 // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DefaultDriverProviderService defaultDriverService;
126
127 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
128
129 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
130 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
131
132 private final PipelinerContext context = new InnerPipelineContext();
133 private final MastershipListener mastershipListener = new InnerMastershipListener();
134 private final DeviceListener deviceListener = new InnerDeviceListener();
135
136 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
137
138 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
139
140 private ExecutorService executorService;
141
142 private String policy;
143 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
144
145 @Activate
146 protected void activate() {
147 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
148 flowObjectiveStore.setDelegate(delegate);
149 mastershipService.addListener(mastershipListener);
150 deviceService.addListener(deviceListener);
151 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
152 deviceCompositionTreeMap = Maps.newConcurrentMap();
153 log.info("Started");
154 }
155
156 @Deactivate
157 protected void deactivate() {
158 flowObjectiveStore.unsetDelegate(delegate);
159 mastershipService.removeListener(mastershipListener);
160 deviceService.removeListener(deviceListener);
161 executorService.shutdown();
162 pipeliners.clear();
163 driverHandlers.clear();
164 deviceCompositionTreeMap.clear();
165 log.info("Stopped");
166 }
167
168 /**
169 * Task that passes the flow objective down to the driver. The task will
170 * make a few attempts to find the appropriate driver, then eventually give
171 * up and report an error if no suitable driver could be found.
172 */
173 private class ObjectiveInstaller implements Runnable {
174 private final DeviceId deviceId;
175 private final Objective objective;
176
177 private final int numAttempts;
178
179 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
180 this(deviceId, objective, 1);
181 }
182
183 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
184 this.deviceId = checkNotNull(deviceId);
185 this.objective = checkNotNull(objective);
186 this.numAttempts = checkNotNull(attemps);
187 }
188
189 @Override
190 public void run() {
191 try {
192 Pipeliner pipeliner = getDevicePipeliner(deviceId);
193
194 if (pipeliner != null) {
195 if (objective instanceof NextObjective) {
196 pipeliner.next((NextObjective) objective);
197 } else if (objective instanceof ForwardingObjective) {
198 pipeliner.forward((ForwardingObjective) objective);
199 } else {
200 pipeliner.filter((FilteringObjective) objective);
201 }
202 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
203 Thread.sleep(INSTALL_RETRY_INTERVAL);
204 executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
205 } else {
206 // Otherwise we've tried a few times and failed, report an
207 // error back to the user.
208 objective.context().ifPresent(
209 c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
210 }
211 } catch (Exception e) {
212 log.warn("Exception while installing flow objective", e);
213 }
214 }
215 }
216
217 @Override
218 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900219 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700220
221 List<FilteringObjective> filteringObjectives
222 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
223 for (FilteringObjective tmp : filteringObjectives) {
224 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
225 }
226 }
227
228 @Override
229 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900230 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700231
232 if (queueObjective(deviceId, forwardingObjective)) {
233 return;
234 }
235 List<ForwardingObjective> forwardingObjectives
236 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
237 for (ForwardingObjective tmp : forwardingObjectives) {
238 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
239 }
240 }
241
242 @Override
243 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900244 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700245
246 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
247 for (NextObjective tmp : nextObjectives) {
248 executorService.submit(new ObjectiveInstaller(deviceId, tmp));
249 }
250 }
251
252 @Override
253 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900254 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700255
256 return flowObjectiveStore.allocateNextId();
257 }
258
259 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
260 if (fwd.nextId() != null &&
261 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
262 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
263 if (pendingForwards.putIfAbsent(fwd.nextId(),
264 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
265 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
266 pending.add(new PendingNext(deviceId, fwd));
267 }
268 return true;
269 }
270 return false;
271 }
272
273 @Override
274 public void initPolicy(String policy) {
275 this.policy = policy;
276 deviceService.getDevices().forEach(device ->
277 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
278 log.info("Initialize policy {}", policy);
279 }
280
281 // Retrieves the device pipeline behaviour from the cache.
282 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
283 Pipeliner pipeliner = pipeliners.get(deviceId);
284 return pipeliner;
285 }
286
287 private void setupPipelineHandler(DeviceId deviceId) {
288 if (defaultDriverService == null) {
289 // We're not ready to go to work yet.
290 return;
291 }
292
293 // Attempt to lookup the handler in the cache
294 DriverHandler handler = driverHandlers.get(deviceId);
295 if (handler == null) {
296 try {
297 // Otherwise create it and if it has pipeline behaviour, cache it
298 handler = driverService.createHandler(deviceId);
299 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
300 log.warn("Pipeline behaviour not supported for device {}",
301 deviceId);
302 return;
303 }
304 } catch (ItemNotFoundException e) {
305 log.warn("No applicable driver for device {}", deviceId);
306 return;
307 }
308
309 driverHandlers.put(deviceId, handler);
310 }
311
312 // Always (re)initialize the pipeline behaviour
313 log.info("Driver {} bound to device {} ... initializing driver",
314 handler.driver().name(), deviceId);
315 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
316 pipeliner.init(deviceId, context);
317 pipeliners.putIfAbsent(deviceId, pipeliner);
318 }
319
320 // Triggers driver setup when the local node becomes a device master.
321 private class InnerMastershipListener implements MastershipListener {
322 @Override
323 public void event(MastershipEvent event) {
324 switch (event.type()) {
325 case MASTER_CHANGED:
326 log.debug("mastership changed on device {}", event.subject());
327 if (deviceService.isAvailable(event.subject())) {
328 setupPipelineHandler(event.subject());
329 }
330 break;
331 case BACKUPS_CHANGED:
332 break;
333 default:
334 break;
335 }
336 }
337 }
338
339 // Triggers driver setup when a device is (re)detected.
340 private class InnerDeviceListener implements DeviceListener {
341 @Override
342 public void event(DeviceEvent event) {
343 switch (event.type()) {
344 case DEVICE_ADDED:
345 case DEVICE_AVAILABILITY_CHANGED:
346 log.debug("Device either added or availability changed {}",
347 event.subject().id());
348 if (deviceService.isAvailable(event.subject().id())) {
349 log.debug("Device is now available {}", event.subject().id());
350 setupPipelineHandler(event.subject().id());
351 }
352 break;
353 case DEVICE_UPDATED:
354 break;
355 case DEVICE_REMOVED:
356 break;
357 case DEVICE_SUSPENDED:
358 break;
359 case PORT_ADDED:
360 break;
361 case PORT_UPDATED:
362 break;
363 case PORT_REMOVED:
364 break;
365 default:
366 break;
367 }
368 }
369 }
370
371 // Processing context for initializing pipeline driver behaviours.
372 private class InnerPipelineContext implements PipelinerContext {
373 @Override
374 public ServiceDirectory directory() {
375 return serviceDirectory;
376 }
377
378 @Override
379 public FlowObjectiveStore store() {
380 return flowObjectiveStore;
381 }
382 }
383
384 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
385 @Override
386 public void notify(ObjectiveEvent event) {
387 log.debug("Received notification of obj event {}", event);
388 Set<PendingNext> pending = pendingForwards.remove(event.subject());
389
390 if (pending == null) {
391 log.debug("Nothing pending for this obj event");
392 return;
393 }
394
395 log.debug("Processing pending forwarding objectives {}", pending.size());
396
397 pending.forEach(p -> getDevicePipeliner(p.deviceId())
398 .forward(p.forwardingObjective()));
399
400 }
401 }
402
403 /**
404 * Data class used to hold a pending forwarding objective that could not
405 * be processed because the associated next object was not present.
406 */
407 private class PendingNext {
408 private final DeviceId deviceId;
409 private final ForwardingObjective fwd;
410
411 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
412 this.deviceId = deviceId;
413 this.fwd = fwd;
414 }
415
416 public DeviceId deviceId() {
417 return deviceId;
418 }
419
420 public ForwardingObjective forwardingObjective() {
421 return fwd;
422 }
423 }
424
425 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
426 String str = forwardingObjective.priority() + " ";
427 str += "selector( ";
428 for (Criterion criterion : forwardingObjective.selector().criteria()) {
429 str += criterion + " ";
430 }
431 str += ") treatment( ";
432 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
433 str += instruction + " ";
434 }
435 str += ")";
436 return str;
437 }
438}