blob: c33a2f4f5b1960fe063d5b384d72c4eecd188924 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Xin Jin313708b2015-07-09 13:43:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
Saurav Dasb5c236e2016-06-07 10:08:06 -070018import com.google.common.collect.ImmutableList;
Xin Jin313708b2015-07-09 13:43:04 -070019import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
Xin Jin313708b2015-07-09 13:43:04 -070022import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.osgi.DefaultServiceDirectory;
27import org.onlab.osgi.ServiceDirectory;
28import org.onlab.util.ItemNotFoundException;
29import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070030import org.onosproject.mastership.MastershipEvent;
31import org.onosproject.mastership.MastershipListener;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.behaviour.Pipeliner;
35import org.onosproject.net.behaviour.PipelinerContext;
36import org.onosproject.net.device.DeviceEvent;
37import org.onosproject.net.device.DeviceListener;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.driver.DefaultDriverProviderService;
40import org.onosproject.net.driver.DriverHandler;
41import org.onosproject.net.driver.DriverService;
42import org.onosproject.net.flow.FlowRuleService;
43import org.onosproject.net.flow.criteria.Criterion;
44import org.onosproject.net.flow.instructions.Instruction;
45import org.onosproject.net.flowobjective.FilteringObjective;
46import org.onosproject.net.flowobjective.FlowObjectiveService;
47import org.onosproject.net.flowobjective.FlowObjectiveStore;
48import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
51import org.onosproject.net.flowobjective.Objective;
52import org.onosproject.net.flowobjective.ObjectiveError;
53import org.onosproject.net.flowobjective.ObjectiveEvent;
54import org.onosproject.net.group.GroupService;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ExecutorService;
62
63import static com.google.common.base.Preconditions.checkNotNull;
64import static java.util.concurrent.Executors.newFixedThreadPool;
65import static org.onlab.util.Tools.groupedThreads;
66import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080067import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070068
69
70/**
71 * Provides implementation of the flow objective programming service with composition feature.
72 *
73 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
74 * that supports composition. It can be enabled by setting the enable flag below to true,
75 * and you should also add "enabled = false" to the FlowObjectiveManager.
76 *
77 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
78 * so it will not have high availability and may break if device mastership changes.
79 * Therefore, it is safest to use this component in a single instance scenario.
80 * This comment will be removed when a distributed implementation is available.
81 */
Jian Li11599162016-01-15 15:46:16 -080082//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070083@Service
84public class FlowObjectiveCompositionManager implements FlowObjectiveService {
85
86 public enum PolicyOperator {
87 Parallel,
88 Sequential,
89 Override,
90 Application
91 }
92
93 public static final int INSTALL_RETRY_ATTEMPTS = 5;
94 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
95
96 private final Logger log = LoggerFactory.getLogger(getClass());
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected DriverService driverService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected DeviceService deviceService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClusterService clusterService;
109
110 // Note: The following dependencies are added on behalf of the pipeline
111 // driver behaviours to assure these services are available for their
112 // initialization.
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected FlowRuleService flowRuleService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected GroupService groupService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected FlowObjectiveStore flowObjectiveStore;
121
122 // Note: This must remain an optional dependency to allow re-install of default drivers.
123 // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
124 // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DefaultDriverProviderService defaultDriverService;
127
128 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
129
130 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
131 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
132
133 private final PipelinerContext context = new InnerPipelineContext();
134 private final MastershipListener mastershipListener = new InnerMastershipListener();
135 private final DeviceListener deviceListener = new InnerDeviceListener();
136
137 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
138
139 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
140
141 private ExecutorService executorService;
142
143 private String policy;
144 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
145
146 @Activate
147 protected void activate() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700148 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
Xin Jin313708b2015-07-09 13:43:04 -0700149 flowObjectiveStore.setDelegate(delegate);
150 mastershipService.addListener(mastershipListener);
151 deviceService.addListener(deviceListener);
152 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
153 deviceCompositionTreeMap = Maps.newConcurrentMap();
154 log.info("Started");
155 }
156
157 @Deactivate
158 protected void deactivate() {
159 flowObjectiveStore.unsetDelegate(delegate);
160 mastershipService.removeListener(mastershipListener);
161 deviceService.removeListener(deviceListener);
162 executorService.shutdown();
163 pipeliners.clear();
164 driverHandlers.clear();
165 deviceCompositionTreeMap.clear();
166 log.info("Stopped");
167 }
168
169 /**
170 * Task that passes the flow objective down to the driver. The task will
171 * make a few attempts to find the appropriate driver, then eventually give
172 * up and report an error if no suitable driver could be found.
173 */
174 private class ObjectiveInstaller implements Runnable {
175 private final DeviceId deviceId;
176 private final Objective objective;
177
178 private final int numAttempts;
179
180 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
181 this(deviceId, objective, 1);
182 }
183
184 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
185 this.deviceId = checkNotNull(deviceId);
186 this.objective = checkNotNull(objective);
187 this.numAttempts = checkNotNull(attemps);
188 }
189
190 @Override
191 public void run() {
192 try {
193 Pipeliner pipeliner = getDevicePipeliner(deviceId);
194
195 if (pipeliner != null) {
196 if (objective instanceof NextObjective) {
197 pipeliner.next((NextObjective) objective);
198 } else if (objective instanceof ForwardingObjective) {
199 pipeliner.forward((ForwardingObjective) objective);
200 } else {
201 pipeliner.filter((FilteringObjective) objective);
202 }
203 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
204 Thread.sleep(INSTALL_RETRY_INTERVAL);
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700205 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
Xin Jin313708b2015-07-09 13:43:04 -0700206 } else {
207 // Otherwise we've tried a few times and failed, report an
208 // error back to the user.
209 objective.context().ifPresent(
Andrea Campanella1f8188d2016-02-29 13:24:54 -0800210 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
Xin Jin313708b2015-07-09 13:43:04 -0700211 }
212 } catch (Exception e) {
213 log.warn("Exception while installing flow objective", e);
214 }
215 }
216 }
217
218 @Override
219 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900220 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700221
222 List<FilteringObjective> filteringObjectives
223 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
224 for (FilteringObjective tmp : filteringObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700225 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700226 }
227 }
228
229 @Override
230 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900231 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700232
233 if (queueObjective(deviceId, forwardingObjective)) {
234 return;
235 }
236 List<ForwardingObjective> forwardingObjectives
237 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
238 for (ForwardingObjective tmp : forwardingObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700239 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700240 }
241 }
242
243 @Override
244 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900245 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700246
247 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
248 for (NextObjective tmp : nextObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700249 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700250 }
251 }
252
253 @Override
254 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900255 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700256
257 return flowObjectiveStore.allocateNextId();
258 }
259
260 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
261 if (fwd.nextId() != null &&
262 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
263 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
264 if (pendingForwards.putIfAbsent(fwd.nextId(),
265 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
266 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
267 pending.add(new PendingNext(deviceId, fwd));
268 }
269 return true;
270 }
271 return false;
272 }
273
274 @Override
275 public void initPolicy(String policy) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900276 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700277 this.policy = policy;
278 deviceService.getDevices().forEach(device ->
279 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
280 log.info("Initialize policy {}", policy);
281 }
282
283 // Retrieves the device pipeline behaviour from the cache.
284 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
285 Pipeliner pipeliner = pipeliners.get(deviceId);
286 return pipeliner;
287 }
288
289 private void setupPipelineHandler(DeviceId deviceId) {
290 if (defaultDriverService == null) {
291 // We're not ready to go to work yet.
292 return;
293 }
294
295 // Attempt to lookup the handler in the cache
296 DriverHandler handler = driverHandlers.get(deviceId);
297 if (handler == null) {
298 try {
299 // Otherwise create it and if it has pipeline behaviour, cache it
300 handler = driverService.createHandler(deviceId);
301 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
302 log.warn("Pipeline behaviour not supported for device {}",
303 deviceId);
304 return;
305 }
306 } catch (ItemNotFoundException e) {
307 log.warn("No applicable driver for device {}", deviceId);
308 return;
309 }
310
311 driverHandlers.put(deviceId, handler);
312 }
313
314 // Always (re)initialize the pipeline behaviour
315 log.info("Driver {} bound to device {} ... initializing driver",
316 handler.driver().name(), deviceId);
317 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
318 pipeliner.init(deviceId, context);
319 pipeliners.putIfAbsent(deviceId, pipeliner);
320 }
321
322 // Triggers driver setup when the local node becomes a device master.
323 private class InnerMastershipListener implements MastershipListener {
324 @Override
325 public void event(MastershipEvent event) {
326 switch (event.type()) {
327 case MASTER_CHANGED:
328 log.debug("mastership changed on device {}", event.subject());
329 if (deviceService.isAvailable(event.subject())) {
330 setupPipelineHandler(event.subject());
331 }
332 break;
333 case BACKUPS_CHANGED:
334 break;
335 default:
336 break;
337 }
338 }
339 }
340
341 // Triggers driver setup when a device is (re)detected.
342 private class InnerDeviceListener implements DeviceListener {
343 @Override
344 public void event(DeviceEvent event) {
345 switch (event.type()) {
346 case DEVICE_ADDED:
347 case DEVICE_AVAILABILITY_CHANGED:
348 log.debug("Device either added or availability changed {}",
349 event.subject().id());
350 if (deviceService.isAvailable(event.subject().id())) {
351 log.debug("Device is now available {}", event.subject().id());
352 setupPipelineHandler(event.subject().id());
353 }
354 break;
355 case DEVICE_UPDATED:
356 break;
357 case DEVICE_REMOVED:
358 break;
359 case DEVICE_SUSPENDED:
360 break;
361 case PORT_ADDED:
362 break;
363 case PORT_UPDATED:
364 break;
365 case PORT_REMOVED:
366 break;
367 default:
368 break;
369 }
370 }
371 }
372
373 // Processing context for initializing pipeline driver behaviours.
374 private class InnerPipelineContext implements PipelinerContext {
375 @Override
376 public ServiceDirectory directory() {
377 return serviceDirectory;
378 }
379
380 @Override
381 public FlowObjectiveStore store() {
382 return flowObjectiveStore;
383 }
384 }
385
386 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
387 @Override
388 public void notify(ObjectiveEvent event) {
389 log.debug("Received notification of obj event {}", event);
390 Set<PendingNext> pending = pendingForwards.remove(event.subject());
391
392 if (pending == null) {
393 log.debug("Nothing pending for this obj event");
394 return;
395 }
396
397 log.debug("Processing pending forwarding objectives {}", pending.size());
398
399 pending.forEach(p -> getDevicePipeliner(p.deviceId())
400 .forward(p.forwardingObjective()));
401
402 }
403 }
404
405 /**
406 * Data class used to hold a pending forwarding objective that could not
407 * be processed because the associated next object was not present.
408 */
409 private class PendingNext {
410 private final DeviceId deviceId;
411 private final ForwardingObjective fwd;
412
413 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
414 this.deviceId = deviceId;
415 this.fwd = fwd;
416 }
417
418 public DeviceId deviceId() {
419 return deviceId;
420 }
421
422 public ForwardingObjective forwardingObjective() {
423 return fwd;
424 }
425 }
426
427 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
428 String str = forwardingObjective.priority() + " ";
429 str += "selector( ";
430 for (Criterion criterion : forwardingObjective.selector().criteria()) {
431 str += criterion + " ";
432 }
433 str += ") treatment( ";
434 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
435 str += instruction + " ";
436 }
437 str += ")";
438 return str;
439 }
Saurav Das24431192016-03-07 19:13:00 -0800440
441 @Override
442 public List<String> getNextMappings() {
443 // TODO Implementation deferred as this is an experimental component.
Saurav Dasb5c236e2016-06-07 10:08:06 -0700444 return ImmutableList.of();
445 }
446
447 @Override
448 public List<String> getPendingNexts() {
449 // TODO Implementation deferred as this is an experimental component.
450 return ImmutableList.of();
Saurav Das24431192016-03-07 19:13:00 -0800451 }
Xin Jin313708b2015-07-09 13:43:04 -0700452}