blob: a765648cb5551b30ef95905f84f086bb0c060291 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Xin Jin313708b2015-07-09 13:43:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
Saurav Dasb5c236e2016-06-07 10:08:06 -070018import com.google.common.collect.ImmutableList;
Xin Jin313708b2015-07-09 13:43:04 -070019import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
Xin Jin313708b2015-07-09 13:43:04 -070022import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.osgi.DefaultServiceDirectory;
27import org.onlab.osgi.ServiceDirectory;
28import org.onlab.util.ItemNotFoundException;
29import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070030import org.onosproject.mastership.MastershipEvent;
31import org.onosproject.mastership.MastershipListener;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.behaviour.Pipeliner;
35import org.onosproject.net.behaviour.PipelinerContext;
36import org.onosproject.net.device.DeviceEvent;
37import org.onosproject.net.device.DeviceListener;
38import org.onosproject.net.device.DeviceService;
Xin Jin313708b2015-07-09 13:43:04 -070039import org.onosproject.net.driver.DriverHandler;
40import org.onosproject.net.driver.DriverService;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.criteria.Criterion;
43import org.onosproject.net.flow.instructions.Instruction;
44import org.onosproject.net.flowobjective.FilteringObjective;
45import org.onosproject.net.flowobjective.FlowObjectiveService;
46import org.onosproject.net.flowobjective.FlowObjectiveStore;
47import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
48import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
50import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveError;
52import org.onosproject.net.flowobjective.ObjectiveEvent;
53import org.onosproject.net.group.GroupService;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static com.google.common.base.Preconditions.checkNotNull;
63import static java.util.concurrent.Executors.newFixedThreadPool;
64import static org.onlab.util.Tools.groupedThreads;
65import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080066import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070067
68
69/**
70 * Provides implementation of the flow objective programming service with composition feature.
71 *
72 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
73 * that supports composition. It can be enabled by setting the enable flag below to true,
74 * and you should also add "enabled = false" to the FlowObjectiveManager.
75 *
76 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
77 * so it will not have high availability and may break if device mastership changes.
78 * Therefore, it is safest to use this component in a single instance scenario.
79 * This comment will be removed when a distributed implementation is available.
80 */
Jian Li11599162016-01-15 15:46:16 -080081//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070082@Service
83public class FlowObjectiveCompositionManager implements FlowObjectiveService {
84
85 public enum PolicyOperator {
86 Parallel,
87 Sequential,
88 Override,
89 Application
90 }
91
92 public static final int INSTALL_RETRY_ATTEMPTS = 5;
93 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
94
95 private final Logger log = LoggerFactory.getLogger(getClass());
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected DriverService driverService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected DeviceService deviceService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected MastershipService mastershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterService clusterService;
108
109 // Note: The following dependencies are added on behalf of the pipeline
110 // driver behaviours to assure these services are available for their
111 // initialization.
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected FlowRuleService flowRuleService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected GroupService groupService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected FlowObjectiveStore flowObjectiveStore;
120
Xin Jin313708b2015-07-09 13:43:04 -0700121 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
122
123 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
124 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
125
126 private final PipelinerContext context = new InnerPipelineContext();
127 private final MastershipListener mastershipListener = new InnerMastershipListener();
128 private final DeviceListener deviceListener = new InnerDeviceListener();
129
130 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
131
132 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
133
134 private ExecutorService executorService;
135
136 private String policy;
137 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
138
139 @Activate
140 protected void activate() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700141 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
Xin Jin313708b2015-07-09 13:43:04 -0700142 flowObjectiveStore.setDelegate(delegate);
143 mastershipService.addListener(mastershipListener);
144 deviceService.addListener(deviceListener);
145 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
146 deviceCompositionTreeMap = Maps.newConcurrentMap();
147 log.info("Started");
148 }
149
150 @Deactivate
151 protected void deactivate() {
152 flowObjectiveStore.unsetDelegate(delegate);
153 mastershipService.removeListener(mastershipListener);
154 deviceService.removeListener(deviceListener);
155 executorService.shutdown();
156 pipeliners.clear();
157 driverHandlers.clear();
158 deviceCompositionTreeMap.clear();
159 log.info("Stopped");
160 }
161
162 /**
163 * Task that passes the flow objective down to the driver. The task will
164 * make a few attempts to find the appropriate driver, then eventually give
165 * up and report an error if no suitable driver could be found.
166 */
167 private class ObjectiveInstaller implements Runnable {
168 private final DeviceId deviceId;
169 private final Objective objective;
170
171 private final int numAttempts;
172
173 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
174 this(deviceId, objective, 1);
175 }
176
177 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
178 this.deviceId = checkNotNull(deviceId);
179 this.objective = checkNotNull(objective);
Yuta HIGUCHI1edc36b2018-01-24 23:39:06 -0800180 this.numAttempts = attemps;
Xin Jin313708b2015-07-09 13:43:04 -0700181 }
182
183 @Override
184 public void run() {
185 try {
186 Pipeliner pipeliner = getDevicePipeliner(deviceId);
187
188 if (pipeliner != null) {
189 if (objective instanceof NextObjective) {
190 pipeliner.next((NextObjective) objective);
191 } else if (objective instanceof ForwardingObjective) {
192 pipeliner.forward((ForwardingObjective) objective);
193 } else {
194 pipeliner.filter((FilteringObjective) objective);
195 }
196 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
197 Thread.sleep(INSTALL_RETRY_INTERVAL);
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700198 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
Xin Jin313708b2015-07-09 13:43:04 -0700199 } else {
200 // Otherwise we've tried a few times and failed, report an
201 // error back to the user.
202 objective.context().ifPresent(
Andrea Campanella1f8188d2016-02-29 13:24:54 -0800203 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
Xin Jin313708b2015-07-09 13:43:04 -0700204 }
205 } catch (Exception e) {
206 log.warn("Exception while installing flow objective", e);
207 }
208 }
209 }
210
211 @Override
212 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900213 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700214
215 List<FilteringObjective> filteringObjectives
216 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
217 for (FilteringObjective tmp : filteringObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700218 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700219 }
220 }
221
222 @Override
223 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900224 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700225
226 if (queueObjective(deviceId, forwardingObjective)) {
227 return;
228 }
229 List<ForwardingObjective> forwardingObjectives
230 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
231 for (ForwardingObjective tmp : forwardingObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700232 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700233 }
234 }
235
236 @Override
237 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900238 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700239
240 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
241 for (NextObjective tmp : nextObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700242 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700243 }
244 }
245
246 @Override
247 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900248 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700249
250 return flowObjectiveStore.allocateNextId();
251 }
252
253 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
254 if (fwd.nextId() != null &&
255 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
256 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
257 if (pendingForwards.putIfAbsent(fwd.nextId(),
258 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
259 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
260 pending.add(new PendingNext(deviceId, fwd));
261 }
262 return true;
263 }
264 return false;
265 }
266
267 @Override
268 public void initPolicy(String policy) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900269 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700270 this.policy = policy;
271 deviceService.getDevices().forEach(device ->
272 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
273 log.info("Initialize policy {}", policy);
274 }
275
276 // Retrieves the device pipeline behaviour from the cache.
277 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Thomas Vachuska11b99fc2017-04-27 12:51:04 -0700278 return pipeliners.get(deviceId);
Xin Jin313708b2015-07-09 13:43:04 -0700279 }
280
281 private void setupPipelineHandler(DeviceId deviceId) {
Xin Jin313708b2015-07-09 13:43:04 -0700282 // Attempt to lookup the handler in the cache
283 DriverHandler handler = driverHandlers.get(deviceId);
284 if (handler == null) {
285 try {
286 // Otherwise create it and if it has pipeline behaviour, cache it
287 handler = driverService.createHandler(deviceId);
288 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
289 log.warn("Pipeline behaviour not supported for device {}",
290 deviceId);
291 return;
292 }
293 } catch (ItemNotFoundException e) {
294 log.warn("No applicable driver for device {}", deviceId);
295 return;
296 }
297
298 driverHandlers.put(deviceId, handler);
299 }
300
301 // Always (re)initialize the pipeline behaviour
302 log.info("Driver {} bound to device {} ... initializing driver",
303 handler.driver().name(), deviceId);
304 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
305 pipeliner.init(deviceId, context);
306 pipeliners.putIfAbsent(deviceId, pipeliner);
307 }
308
309 // Triggers driver setup when the local node becomes a device master.
310 private class InnerMastershipListener implements MastershipListener {
311 @Override
312 public void event(MastershipEvent event) {
313 switch (event.type()) {
314 case MASTER_CHANGED:
315 log.debug("mastership changed on device {}", event.subject());
316 if (deviceService.isAvailable(event.subject())) {
317 setupPipelineHandler(event.subject());
318 }
319 break;
320 case BACKUPS_CHANGED:
321 break;
322 default:
323 break;
324 }
325 }
326 }
327
328 // Triggers driver setup when a device is (re)detected.
329 private class InnerDeviceListener implements DeviceListener {
330 @Override
331 public void event(DeviceEvent event) {
332 switch (event.type()) {
333 case DEVICE_ADDED:
334 case DEVICE_AVAILABILITY_CHANGED:
335 log.debug("Device either added or availability changed {}",
336 event.subject().id());
337 if (deviceService.isAvailable(event.subject().id())) {
338 log.debug("Device is now available {}", event.subject().id());
339 setupPipelineHandler(event.subject().id());
340 }
341 break;
342 case DEVICE_UPDATED:
343 break;
344 case DEVICE_REMOVED:
345 break;
346 case DEVICE_SUSPENDED:
347 break;
348 case PORT_ADDED:
349 break;
350 case PORT_UPDATED:
351 break;
352 case PORT_REMOVED:
353 break;
354 default:
355 break;
356 }
357 }
358 }
359
360 // Processing context for initializing pipeline driver behaviours.
361 private class InnerPipelineContext implements PipelinerContext {
362 @Override
363 public ServiceDirectory directory() {
364 return serviceDirectory;
365 }
366
367 @Override
368 public FlowObjectiveStore store() {
369 return flowObjectiveStore;
370 }
371 }
372
373 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
374 @Override
375 public void notify(ObjectiveEvent event) {
376 log.debug("Received notification of obj event {}", event);
377 Set<PendingNext> pending = pendingForwards.remove(event.subject());
378
379 if (pending == null) {
380 log.debug("Nothing pending for this obj event");
381 return;
382 }
383
384 log.debug("Processing pending forwarding objectives {}", pending.size());
385
386 pending.forEach(p -> getDevicePipeliner(p.deviceId())
387 .forward(p.forwardingObjective()));
388
389 }
390 }
391
392 /**
393 * Data class used to hold a pending forwarding objective that could not
394 * be processed because the associated next object was not present.
395 */
396 private class PendingNext {
397 private final DeviceId deviceId;
398 private final ForwardingObjective fwd;
399
400 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
401 this.deviceId = deviceId;
402 this.fwd = fwd;
403 }
404
405 public DeviceId deviceId() {
406 return deviceId;
407 }
408
409 public ForwardingObjective forwardingObjective() {
410 return fwd;
411 }
412 }
413
414 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
415 String str = forwardingObjective.priority() + " ";
416 str += "selector( ";
417 for (Criterion criterion : forwardingObjective.selector().criteria()) {
418 str += criterion + " ";
419 }
420 str += ") treatment( ";
421 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
422 str += instruction + " ";
423 }
424 str += ")";
425 return str;
426 }
Saurav Das24431192016-03-07 19:13:00 -0800427
428 @Override
429 public List<String> getNextMappings() {
430 // TODO Implementation deferred as this is an experimental component.
Saurav Dasb5c236e2016-06-07 10:08:06 -0700431 return ImmutableList.of();
432 }
433
434 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700435 public List<String> getPendingFlowObjectives() {
Saurav Dasb5c236e2016-06-07 10:08:06 -0700436 // TODO Implementation deferred as this is an experimental component.
437 return ImmutableList.of();
Saurav Das24431192016-03-07 19:13:00 -0800438 }
Saurav Das1547b3f2017-05-05 17:01:08 -0700439
440 @Override
441 public List<String> getPendingNexts() {
442 return ImmutableList.of();
443 }
Xin Jin313708b2015-07-09 13:43:04 -0700444}