blob: b930d6b9614f4eaf39d5e5028c4ef583a1dd8577 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Xin Jin313708b2015-07-09 13:43:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
Saurav Dasb5c236e2016-06-07 10:08:06 -070018import com.google.common.collect.ImmutableList;
Harshada Chaundkar5a198b02019-07-03 16:27:45 +000019import com.google.common.collect.ImmutableMap;
Xin Jin313708b2015-07-09 13:43:04 -070020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Xin Jin313708b2015-07-09 13:43:04 -070022import org.onlab.osgi.DefaultServiceDirectory;
23import org.onlab.osgi.ServiceDirectory;
24import org.onlab.util.ItemNotFoundException;
25import org.onosproject.cluster.ClusterService;
Daniele Moro607fd0b2021-07-19 22:39:22 +020026import org.onosproject.core.ApplicationId;
Xin Jin313708b2015-07-09 13:43:04 -070027import org.onosproject.mastership.MastershipEvent;
28import org.onosproject.mastership.MastershipListener;
29import org.onosproject.mastership.MastershipService;
Xin Jin313708b2015-07-09 13:43:04 -070030import org.onosproject.net.behaviour.Pipeliner;
31import org.onosproject.net.behaviour.PipelinerContext;
32import org.onosproject.net.device.DeviceEvent;
33import org.onosproject.net.device.DeviceListener;
34import org.onosproject.net.device.DeviceService;
Xin Jin313708b2015-07-09 13:43:04 -070035import org.onosproject.net.driver.DriverHandler;
36import org.onosproject.net.driver.DriverService;
37import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.criteria.Criterion;
39import org.onosproject.net.flow.instructions.Instruction;
40import org.onosproject.net.flowobjective.FilteringObjective;
41import org.onosproject.net.flowobjective.FlowObjectiveService;
42import org.onosproject.net.flowobjective.FlowObjectiveStore;
43import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
44import org.onosproject.net.flowobjective.ForwardingObjective;
45import org.onosproject.net.flowobjective.NextObjective;
46import org.onosproject.net.flowobjective.Objective;
47import org.onosproject.net.flowobjective.ObjectiveError;
48import org.onosproject.net.flowobjective.ObjectiveEvent;
49import org.onosproject.net.group.GroupService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070050import org.osgi.service.component.annotations.Activate;
51import org.osgi.service.component.annotations.Deactivate;
52import org.osgi.service.component.annotations.Reference;
53import org.osgi.service.component.annotations.ReferenceCardinality;
Xin Jin313708b2015-07-09 13:43:04 -070054import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
Harshada Chaundkar5a198b02019-07-03 16:27:45 +000061import org.apache.commons.lang3.tuple.Pair;
62import org.onosproject.net.DeviceId;
Xin Jin313708b2015-07-09 13:43:04 -070063
64import static com.google.common.base.Preconditions.checkNotNull;
65import static java.util.concurrent.Executors.newFixedThreadPool;
66import static org.onlab.util.Tools.groupedThreads;
67import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080068import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070069
70
71/**
72 * Provides implementation of the flow objective programming service with composition feature.
73 *
74 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
75 * that supports composition. It can be enabled by setting the enable flag below to true,
76 * and you should also add "enabled = false" to the FlowObjectiveManager.
77 *
78 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
79 * so it will not have high availability and may break if device mastership changes.
80 * Therefore, it is safest to use this component in a single instance scenario.
81 * This comment will be removed when a distributed implementation is available.
82 */
Jian Li11599162016-01-15 15:46:16 -080083//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070084public class FlowObjectiveCompositionManager implements FlowObjectiveService {
85
86 public enum PolicyOperator {
87 Parallel,
88 Sequential,
89 Override,
90 Application
91 }
92
93 public static final int INSTALL_RETRY_ATTEMPTS = 5;
94 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
95
96 private final Logger log = LoggerFactory.getLogger(getClass());
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -070099 protected DriverService driverService;
100
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700102 protected DeviceService deviceService;
103
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700105 protected MastershipService mastershipService;
106
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700108 protected ClusterService clusterService;
109
110 // Note: The following dependencies are added on behalf of the pipeline
111 // driver behaviours to assure these services are available for their
112 // initialization.
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700114 protected FlowRuleService flowRuleService;
115
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700117 protected GroupService groupService;
118
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Xin Jin313708b2015-07-09 13:43:04 -0700120 protected FlowObjectiveStore flowObjectiveStore;
121
Xin Jin313708b2015-07-09 13:43:04 -0700122 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
123
124 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
125 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
126
127 private final PipelinerContext context = new InnerPipelineContext();
128 private final MastershipListener mastershipListener = new InnerMastershipListener();
129 private final DeviceListener deviceListener = new InnerDeviceListener();
130
131 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
132
133 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
134
135 private ExecutorService executorService;
136
137 private String policy;
138 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
139
140 @Activate
141 protected void activate() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700142 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
Xin Jin313708b2015-07-09 13:43:04 -0700143 flowObjectiveStore.setDelegate(delegate);
144 mastershipService.addListener(mastershipListener);
145 deviceService.addListener(deviceListener);
146 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
147 deviceCompositionTreeMap = Maps.newConcurrentMap();
148 log.info("Started");
149 }
150
151 @Deactivate
152 protected void deactivate() {
153 flowObjectiveStore.unsetDelegate(delegate);
154 mastershipService.removeListener(mastershipListener);
155 deviceService.removeListener(deviceListener);
156 executorService.shutdown();
157 pipeliners.clear();
158 driverHandlers.clear();
159 deviceCompositionTreeMap.clear();
160 log.info("Stopped");
161 }
162
163 /**
164 * Task that passes the flow objective down to the driver. The task will
165 * make a few attempts to find the appropriate driver, then eventually give
166 * up and report an error if no suitable driver could be found.
167 */
168 private class ObjectiveInstaller implements Runnable {
169 private final DeviceId deviceId;
170 private final Objective objective;
171
172 private final int numAttempts;
173
174 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
175 this(deviceId, objective, 1);
176 }
177
178 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
179 this.deviceId = checkNotNull(deviceId);
180 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800181 this.numAttempts = attemps;
Xin Jin313708b2015-07-09 13:43:04 -0700182 }
183
184 @Override
185 public void run() {
186 try {
187 Pipeliner pipeliner = getDevicePipeliner(deviceId);
188
189 if (pipeliner != null) {
190 if (objective instanceof NextObjective) {
191 pipeliner.next((NextObjective) objective);
192 } else if (objective instanceof ForwardingObjective) {
193 pipeliner.forward((ForwardingObjective) objective);
194 } else {
195 pipeliner.filter((FilteringObjective) objective);
196 }
197 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
198 Thread.sleep(INSTALL_RETRY_INTERVAL);
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700199 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
Xin Jin313708b2015-07-09 13:43:04 -0700200 } else {
201 // Otherwise we've tried a few times and failed, report an
202 // error back to the user.
203 objective.context().ifPresent(
Andrea Campanella1f8188d2016-02-29 13:24:54 -0800204 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
Xin Jin313708b2015-07-09 13:43:04 -0700205 }
206 } catch (Exception e) {
207 log.warn("Exception while installing flow objective", e);
208 }
209 }
210 }
211
212 @Override
213 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900214 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700215
216 List<FilteringObjective> filteringObjectives
217 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
218 for (FilteringObjective tmp : filteringObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700219 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700220 }
221 }
222
223 @Override
224 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900225 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700226
227 if (queueObjective(deviceId, forwardingObjective)) {
228 return;
229 }
230 List<ForwardingObjective> forwardingObjectives
231 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
232 for (ForwardingObjective tmp : forwardingObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700233 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700234 }
235 }
236
237 @Override
238 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900239 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700240
241 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
242 for (NextObjective tmp : nextObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700243 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700244 }
245 }
246
247 @Override
248 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900249 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700250
251 return flowObjectiveStore.allocateNextId();
252 }
253
254 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
255 if (fwd.nextId() != null &&
256 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
257 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
258 if (pendingForwards.putIfAbsent(fwd.nextId(),
259 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
260 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
261 pending.add(new PendingNext(deviceId, fwd));
262 }
263 return true;
264 }
265 return false;
266 }
267
268 @Override
269 public void initPolicy(String policy) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900270 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700271 this.policy = policy;
272 deviceService.getDevices().forEach(device ->
273 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
274 log.info("Initialize policy {}", policy);
275 }
276
277 // Retrieves the device pipeline behaviour from the cache.
278 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Thomas Vachuska11b99fc2017-04-27 12:51:04 -0700279 return pipeliners.get(deviceId);
Xin Jin313708b2015-07-09 13:43:04 -0700280 }
281
282 private void setupPipelineHandler(DeviceId deviceId) {
Xin Jin313708b2015-07-09 13:43:04 -0700283 // Attempt to lookup the handler in the cache
284 DriverHandler handler = driverHandlers.get(deviceId);
285 if (handler == null) {
286 try {
287 // Otherwise create it and if it has pipeline behaviour, cache it
288 handler = driverService.createHandler(deviceId);
289 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
290 log.warn("Pipeline behaviour not supported for device {}",
291 deviceId);
292 return;
293 }
294 } catch (ItemNotFoundException e) {
295 log.warn("No applicable driver for device {}", deviceId);
296 return;
297 }
298
299 driverHandlers.put(deviceId, handler);
300 }
301
302 // Always (re)initialize the pipeline behaviour
303 log.info("Driver {} bound to device {} ... initializing driver",
304 handler.driver().name(), deviceId);
305 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
306 pipeliner.init(deviceId, context);
307 pipeliners.putIfAbsent(deviceId, pipeliner);
308 }
309
310 // Triggers driver setup when the local node becomes a device master.
311 private class InnerMastershipListener implements MastershipListener {
312 @Override
313 public void event(MastershipEvent event) {
314 switch (event.type()) {
315 case MASTER_CHANGED:
316 log.debug("mastership changed on device {}", event.subject());
317 if (deviceService.isAvailable(event.subject())) {
318 setupPipelineHandler(event.subject());
319 }
320 break;
321 case BACKUPS_CHANGED:
322 break;
323 default:
324 break;
325 }
326 }
327 }
328
329 // Triggers driver setup when a device is (re)detected.
330 private class InnerDeviceListener implements DeviceListener {
331 @Override
332 public void event(DeviceEvent event) {
333 switch (event.type()) {
334 case DEVICE_ADDED:
335 case DEVICE_AVAILABILITY_CHANGED:
336 log.debug("Device either added or availability changed {}",
337 event.subject().id());
338 if (deviceService.isAvailable(event.subject().id())) {
339 log.debug("Device is now available {}", event.subject().id());
340 setupPipelineHandler(event.subject().id());
341 }
342 break;
343 case DEVICE_UPDATED:
344 break;
345 case DEVICE_REMOVED:
346 break;
347 case DEVICE_SUSPENDED:
348 break;
349 case PORT_ADDED:
350 break;
351 case PORT_UPDATED:
352 break;
353 case PORT_REMOVED:
354 break;
355 default:
356 break;
357 }
358 }
359 }
360
361 // Processing context for initializing pipeline driver behaviours.
362 private class InnerPipelineContext implements PipelinerContext {
363 @Override
364 public ServiceDirectory directory() {
365 return serviceDirectory;
366 }
367
368 @Override
369 public FlowObjectiveStore store() {
370 return flowObjectiveStore;
371 }
372 }
373
374 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
375 @Override
376 public void notify(ObjectiveEvent event) {
377 log.debug("Received notification of obj event {}", event);
378 Set<PendingNext> pending = pendingForwards.remove(event.subject());
379
380 if (pending == null) {
381 log.debug("Nothing pending for this obj event");
382 return;
383 }
384
385 log.debug("Processing pending forwarding objectives {}", pending.size());
386
387 pending.forEach(p -> getDevicePipeliner(p.deviceId())
388 .forward(p.forwardingObjective()));
389
390 }
391 }
392
393 /**
394 * Data class used to hold a pending forwarding objective that could not
395 * be processed because the associated next object was not present.
396 */
397 private class PendingNext {
398 private final DeviceId deviceId;
399 private final ForwardingObjective fwd;
400
401 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
402 this.deviceId = deviceId;
403 this.fwd = fwd;
404 }
405
406 public DeviceId deviceId() {
407 return deviceId;
408 }
409
410 public ForwardingObjective forwardingObjective() {
411 return fwd;
412 }
413 }
414
415 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
416 String str = forwardingObjective.priority() + " ";
417 str += "selector( ";
418 for (Criterion criterion : forwardingObjective.selector().criteria()) {
419 str += criterion + " ";
420 }
421 str += ") treatment( ";
422 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
423 str += instruction + " ";
424 }
425 str += ")";
426 return str;
427 }
Saurav Das24431192016-03-07 19:13:00 -0800428
429 @Override
430 public List<String> getNextMappings() {
431 // TODO Implementation deferred as this is an experimental component.
Saurav Dasb5c236e2016-06-07 10:08:06 -0700432 return ImmutableList.of();
433 }
434
435 @Override
Harshada Chaundkar5a198b02019-07-03 16:27:45 +0000436 public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
437 return ImmutableMap.of();
438 }
439
440 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700441 public List<String> getPendingFlowObjectives() {
Saurav Dasb5c236e2016-06-07 10:08:06 -0700442 // TODO Implementation deferred as this is an experimental component.
443 return ImmutableList.of();
Saurav Das24431192016-03-07 19:13:00 -0800444 }
Daniele Moro607fd0b2021-07-19 22:39:22 +0200445
446 @Override
447 public void purgeAll(DeviceId deviceId, ApplicationId appId) {
448 // TODO Implementation deferred as this is an experimental component.
449 }
Xin Jin313708b2015-07-09 13:43:04 -0700450}