blob: f54b60499cec16066fbcebac012d48ed28198db6 [file] [log] [blame]
Xin Jin313708b2015-07-09 13:43:04 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Xin Jin313708b2015-07-09 13:43:04 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.net.flowobjective.impl.composition;
17
Saurav Dasb5c236e2016-06-07 10:08:06 -070018import com.google.common.collect.ImmutableList;
Harshada Chaundkar91908af2019-07-03 16:27:45 +000019import com.google.common.collect.ImmutableMap;
Xin Jin313708b2015-07-09 13:43:04 -070020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import org.apache.felix.scr.annotations.Activate;
Xin Jin313708b2015-07-09 13:43:04 -070023import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.osgi.DefaultServiceDirectory;
28import org.onlab.osgi.ServiceDirectory;
29import org.onlab.util.ItemNotFoundException;
30import org.onosproject.cluster.ClusterService;
Xin Jin313708b2015-07-09 13:43:04 -070031import org.onosproject.mastership.MastershipEvent;
32import org.onosproject.mastership.MastershipListener;
33import org.onosproject.mastership.MastershipService;
Xin Jin313708b2015-07-09 13:43:04 -070034import org.onosproject.net.behaviour.Pipeliner;
35import org.onosproject.net.behaviour.PipelinerContext;
36import org.onosproject.net.device.DeviceEvent;
37import org.onosproject.net.device.DeviceListener;
38import org.onosproject.net.device.DeviceService;
Xin Jin313708b2015-07-09 13:43:04 -070039import org.onosproject.net.driver.DriverHandler;
40import org.onosproject.net.driver.DriverService;
41import org.onosproject.net.flow.FlowRuleService;
42import org.onosproject.net.flow.criteria.Criterion;
43import org.onosproject.net.flow.instructions.Instruction;
44import org.onosproject.net.flowobjective.FilteringObjective;
45import org.onosproject.net.flowobjective.FlowObjectiveService;
46import org.onosproject.net.flowobjective.FlowObjectiveStore;
47import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
48import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
50import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveError;
52import org.onosproject.net.flowobjective.ObjectiveEvent;
53import org.onosproject.net.group.GroupService;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
Harshada Chaundkar91908af2019-07-03 16:27:45 +000061import org.apache.commons.lang3.tuple.Pair;
62import org.onosproject.net.DeviceId;
Xin Jin313708b2015-07-09 13:43:04 -070063
64import static com.google.common.base.Preconditions.checkNotNull;
65import static java.util.concurrent.Executors.newFixedThreadPool;
66import static org.onlab.util.Tools.groupedThreads;
67import static org.onosproject.security.AppGuard.checkPermission;
Jian Li11599162016-01-15 15:46:16 -080068import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
Xin Jin313708b2015-07-09 13:43:04 -070069
70
71/**
72 * Provides implementation of the flow objective programming service with composition feature.
73 *
74 * Note: This is an experimental, alternative implementation of the FlowObjectiveManager
75 * that supports composition. It can be enabled by setting the enable flag below to true,
76 * and you should also add "enabled = false" to the FlowObjectiveManager.
77 *
78 * The implementation relies a FlowObjectiveCompositionTree that is not yet distributed,
79 * so it will not have high availability and may break if device mastership changes.
80 * Therefore, it is safest to use this component in a single instance scenario.
81 * This comment will be removed when a distributed implementation is available.
82 */
Jian Li11599162016-01-15 15:46:16 -080083//@Component(immediate = true, enabled = false)
Xin Jin313708b2015-07-09 13:43:04 -070084@Service
85public class FlowObjectiveCompositionManager implements FlowObjectiveService {
86
87 public enum PolicyOperator {
88 Parallel,
89 Sequential,
90 Override,
91 Application
92 }
93
94 public static final int INSTALL_RETRY_ATTEMPTS = 5;
95 public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
96
97 private final Logger log = LoggerFactory.getLogger(getClass());
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected DriverService driverService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected DeviceService deviceService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected MastershipService mastershipService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterService clusterService;
110
111 // Note: The following dependencies are added on behalf of the pipeline
112 // driver behaviours to assure these services are available for their
113 // initialization.
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected FlowRuleService flowRuleService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected GroupService groupService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected FlowObjectiveStore flowObjectiveStore;
122
Xin Jin313708b2015-07-09 13:43:04 -0700123 private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
124
125 private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
126 private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
127
128 private final PipelinerContext context = new InnerPipelineContext();
129 private final MastershipListener mastershipListener = new InnerMastershipListener();
130 private final DeviceListener deviceListener = new InnerDeviceListener();
131
132 protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
133
134 private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
135
136 private ExecutorService executorService;
137
138 private String policy;
139 private Map<DeviceId, FlowObjectiveCompositionTree> deviceCompositionTreeMap;
140
141 @Activate
142 protected void activate() {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700143 executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
Xin Jin313708b2015-07-09 13:43:04 -0700144 flowObjectiveStore.setDelegate(delegate);
145 mastershipService.addListener(mastershipListener);
146 deviceService.addListener(deviceListener);
147 deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
148 deviceCompositionTreeMap = Maps.newConcurrentMap();
149 log.info("Started");
150 }
151
152 @Deactivate
153 protected void deactivate() {
154 flowObjectiveStore.unsetDelegate(delegate);
155 mastershipService.removeListener(mastershipListener);
156 deviceService.removeListener(deviceListener);
157 executorService.shutdown();
158 pipeliners.clear();
159 driverHandlers.clear();
160 deviceCompositionTreeMap.clear();
161 log.info("Stopped");
162 }
163
164 /**
165 * Task that passes the flow objective down to the driver. The task will
166 * make a few attempts to find the appropriate driver, then eventually give
167 * up and report an error if no suitable driver could be found.
168 */
169 private class ObjectiveInstaller implements Runnable {
170 private final DeviceId deviceId;
171 private final Objective objective;
172
173 private final int numAttempts;
174
175 public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
176 this(deviceId, objective, 1);
177 }
178
179 public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
180 this.deviceId = checkNotNull(deviceId);
181 this.objective = checkNotNull(objective);
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -0800182 this.numAttempts = attemps;
Xin Jin313708b2015-07-09 13:43:04 -0700183 }
184
185 @Override
186 public void run() {
187 try {
188 Pipeliner pipeliner = getDevicePipeliner(deviceId);
189
190 if (pipeliner != null) {
191 if (objective instanceof NextObjective) {
192 pipeliner.next((NextObjective) objective);
193 } else if (objective instanceof ForwardingObjective) {
194 pipeliner.forward((ForwardingObjective) objective);
195 } else {
196 pipeliner.filter((FilteringObjective) objective);
197 }
198 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
199 Thread.sleep(INSTALL_RETRY_INTERVAL);
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700200 executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
Xin Jin313708b2015-07-09 13:43:04 -0700201 } else {
202 // Otherwise we've tried a few times and failed, report an
203 // error back to the user.
204 objective.context().ifPresent(
Andrea Campanella1f8188d2016-02-29 13:24:54 -0800205 c -> c.onError(objective, ObjectiveError.NOPIPELINER));
Xin Jin313708b2015-07-09 13:43:04 -0700206 }
207 } catch (Exception e) {
208 log.warn("Exception while installing flow objective", e);
209 }
210 }
211 }
212
213 @Override
214 public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900215 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700216
217 List<FilteringObjective> filteringObjectives
218 = this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
219 for (FilteringObjective tmp : filteringObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700220 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700221 }
222 }
223
224 @Override
225 public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900226 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700227
228 if (queueObjective(deviceId, forwardingObjective)) {
229 return;
230 }
231 List<ForwardingObjective> forwardingObjectives
232 = this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
233 for (ForwardingObjective tmp : forwardingObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700234 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700235 }
236 }
237
238 @Override
239 public void next(DeviceId deviceId, NextObjective nextObjective) {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900240 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700241
242 List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
243 for (NextObjective tmp : nextObjectives) {
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700244 executorService.execute(new ObjectiveInstaller(deviceId, tmp));
Xin Jin313708b2015-07-09 13:43:04 -0700245 }
246 }
247
248 @Override
249 public int allocateNextId() {
Changhoon Yoonb856b812015-08-10 03:47:19 +0900250 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700251
252 return flowObjectiveStore.allocateNextId();
253 }
254
255 private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
256 if (fwd.nextId() != null &&
257 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
258 log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
259 if (pendingForwards.putIfAbsent(fwd.nextId(),
260 Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
261 Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
262 pending.add(new PendingNext(deviceId, fwd));
263 }
264 return true;
265 }
266 return false;
267 }
268
269 @Override
270 public void initPolicy(String policy) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900271 checkPermission(FLOWRULE_WRITE);
Xin Jin313708b2015-07-09 13:43:04 -0700272 this.policy = policy;
273 deviceService.getDevices().forEach(device ->
274 this.deviceCompositionTreeMap.put(device.id(), FlowObjectiveCompositionUtil.parsePolicyString(policy)));
275 log.info("Initialize policy {}", policy);
276 }
277
278 // Retrieves the device pipeline behaviour from the cache.
279 private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Thomas Vachuska11b99fc2017-04-27 12:51:04 -0700280 return pipeliners.get(deviceId);
Xin Jin313708b2015-07-09 13:43:04 -0700281 }
282
283 private void setupPipelineHandler(DeviceId deviceId) {
Xin Jin313708b2015-07-09 13:43:04 -0700284 // Attempt to lookup the handler in the cache
285 DriverHandler handler = driverHandlers.get(deviceId);
286 if (handler == null) {
287 try {
288 // Otherwise create it and if it has pipeline behaviour, cache it
289 handler = driverService.createHandler(deviceId);
290 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
291 log.warn("Pipeline behaviour not supported for device {}",
292 deviceId);
293 return;
294 }
295 } catch (ItemNotFoundException e) {
296 log.warn("No applicable driver for device {}", deviceId);
297 return;
298 }
299
300 driverHandlers.put(deviceId, handler);
301 }
302
303 // Always (re)initialize the pipeline behaviour
304 log.info("Driver {} bound to device {} ... initializing driver",
305 handler.driver().name(), deviceId);
306 Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
307 pipeliner.init(deviceId, context);
308 pipeliners.putIfAbsent(deviceId, pipeliner);
309 }
310
311 // Triggers driver setup when the local node becomes a device master.
312 private class InnerMastershipListener implements MastershipListener {
313 @Override
314 public void event(MastershipEvent event) {
315 switch (event.type()) {
316 case MASTER_CHANGED:
317 log.debug("mastership changed on device {}", event.subject());
318 if (deviceService.isAvailable(event.subject())) {
319 setupPipelineHandler(event.subject());
320 }
321 break;
322 case BACKUPS_CHANGED:
323 break;
324 default:
325 break;
326 }
327 }
328 }
329
330 // Triggers driver setup when a device is (re)detected.
331 private class InnerDeviceListener implements DeviceListener {
332 @Override
333 public void event(DeviceEvent event) {
334 switch (event.type()) {
335 case DEVICE_ADDED:
336 case DEVICE_AVAILABILITY_CHANGED:
337 log.debug("Device either added or availability changed {}",
338 event.subject().id());
339 if (deviceService.isAvailable(event.subject().id())) {
340 log.debug("Device is now available {}", event.subject().id());
341 setupPipelineHandler(event.subject().id());
342 }
343 break;
344 case DEVICE_UPDATED:
345 break;
346 case DEVICE_REMOVED:
347 break;
348 case DEVICE_SUSPENDED:
349 break;
350 case PORT_ADDED:
351 break;
352 case PORT_UPDATED:
353 break;
354 case PORT_REMOVED:
355 break;
356 default:
357 break;
358 }
359 }
360 }
361
362 // Processing context for initializing pipeline driver behaviours.
363 private class InnerPipelineContext implements PipelinerContext {
364 @Override
365 public ServiceDirectory directory() {
366 return serviceDirectory;
367 }
368
369 @Override
370 public FlowObjectiveStore store() {
371 return flowObjectiveStore;
372 }
373 }
374
375 private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
376 @Override
377 public void notify(ObjectiveEvent event) {
378 log.debug("Received notification of obj event {}", event);
379 Set<PendingNext> pending = pendingForwards.remove(event.subject());
380
381 if (pending == null) {
382 log.debug("Nothing pending for this obj event");
383 return;
384 }
385
386 log.debug("Processing pending forwarding objectives {}", pending.size());
387
388 pending.forEach(p -> getDevicePipeliner(p.deviceId())
389 .forward(p.forwardingObjective()));
390
391 }
392 }
393
394 /**
395 * Data class used to hold a pending forwarding objective that could not
396 * be processed because the associated next object was not present.
397 */
398 private class PendingNext {
399 private final DeviceId deviceId;
400 private final ForwardingObjective fwd;
401
402 public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
403 this.deviceId = deviceId;
404 this.fwd = fwd;
405 }
406
407 public DeviceId deviceId() {
408 return deviceId;
409 }
410
411 public ForwardingObjective forwardingObjective() {
412 return fwd;
413 }
414 }
415
416 public static String forwardingObjectiveToString(ForwardingObjective forwardingObjective) {
417 String str = forwardingObjective.priority() + " ";
418 str += "selector( ";
419 for (Criterion criterion : forwardingObjective.selector().criteria()) {
420 str += criterion + " ";
421 }
422 str += ") treatment( ";
423 for (Instruction instruction : forwardingObjective.treatment().allInstructions()) {
424 str += instruction + " ";
425 }
426 str += ")";
427 return str;
428 }
Saurav Das24431192016-03-07 19:13:00 -0800429
430 @Override
431 public List<String> getNextMappings() {
432 // TODO Implementation deferred as this is an experimental component.
Saurav Dasb5c236e2016-06-07 10:08:06 -0700433 return ImmutableList.of();
434 }
435
436 @Override
Harshada Chaundkar91908af2019-07-03 16:27:45 +0000437 public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
438 return ImmutableMap.of();
439 }
440
441 @Override
Saurav Das1547b3f2017-05-05 17:01:08 -0700442 public List<String> getPendingFlowObjectives() {
Saurav Dasb5c236e2016-06-07 10:08:06 -0700443 // TODO Implementation deferred as this is an experimental component.
444 return ImmutableList.of();
Saurav Das24431192016-03-07 19:13:00 -0800445 }
Xin Jin313708b2015-07-09 13:43:04 -0700446}