blob: 227ec24de7fb0cf2acfa13e87d151a27aaacb209 [file] [log] [blame]
wu5f6c5b82017-08-04 16:45:19 +08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pi.demo.app.common;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableMap;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.onosproject.app.ApplicationAdminService;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Host;
37import org.onosproject.net.Port;
38import org.onosproject.net.device.DeviceEvent;
39import org.onosproject.net.device.DeviceListener;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.DefaultFlowRule;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleOperations;
44import org.onosproject.net.flow.FlowRuleService;
wu5f6c5b82017-08-04 16:45:19 +080045import org.onosproject.net.host.HostService;
46import org.onosproject.net.pi.model.PiPipeconf;
47import org.onosproject.net.pi.model.PiPipelineInterpreter;
48import org.onosproject.net.pi.runtime.PiPipeconfService;
49import org.onosproject.net.pi.runtime.PiTableId;
50import org.onosproject.net.topology.Topology;
wu5f6c5b82017-08-04 16:45:19 +080051import org.onosproject.net.topology.TopologyGraph;
wu5f6c5b82017-08-04 16:45:19 +080052import org.onosproject.net.topology.TopologyService;
53import org.onosproject.net.topology.TopologyVertex;
54import org.slf4j.Logger;
55
56import java.util.Collection;
wu5f6c5b82017-08-04 16:45:19 +080057import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ConcurrentMap;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.Executors;
Carmelo Cascone3929cc82017-09-06 13:34:25 +020063import java.util.concurrent.ScheduledExecutorService;
wu5f6c5b82017-08-04 16:45:19 +080064import java.util.concurrent.TimeUnit;
65import java.util.concurrent.locks.Lock;
66import java.util.concurrent.locks.ReentrantLock;
67import java.util.stream.Collectors;
68import java.util.stream.Stream;
69
70import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +020071import static java.lang.String.format;
wu5f6c5b82017-08-04 16:45:19 +080072import static java.util.stream.Collectors.toSet;
73import static java.util.stream.Stream.concat;
74import static org.onlab.util.Tools.groupedThreads;
75import static org.onosproject.net.device.DeviceEvent.Type.*;
wu5f6c5b82017-08-04 16:45:19 +080076import static org.slf4j.LoggerFactory.getLogger;
77
78/**
79 * Abstract implementation of an app providing fabric connectivity for a 2-stage Clos topology of P4Runtime devices.
80 */
81@Component(immediate = true)
82public abstract class AbstractUpgradableFabricApp {
83
84 private static final Map<String, AbstractUpgradableFabricApp> APP_HANDLES = Maps.newConcurrentMap();
85
Carmelo Cascone3929cc82017-09-06 13:34:25 +020086 // TOPO_SIZE should be the same of the --size argument when running bmv2-demo.py
87 private static final int TOPO_SIZE = 2;
88 private static final boolean WITH_IMBALANCED_STRIPING = false;
89 protected static final int HASHED_LINKS = TOPO_SIZE + (WITH_IMBALANCED_STRIPING ? 1 : 0);
90
wu5f6c5b82017-08-04 16:45:19 +080091 private static final int FLOW_PRIORITY = 100;
Carmelo Cascone3929cc82017-09-06 13:34:25 +020092 private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
wu5f6c5b82017-08-04 16:45:19 +080093
94 private static final int CLEANUP_SLEEP = 2000;
95
96 protected final Logger log = getLogger(getClass());
97
wu5f6c5b82017-08-04 16:45:19 +080098 private final DeviceListener deviceListener = new InternalDeviceListener();
wu5f6c5b82017-08-04 16:45:19 +080099
100 private final ExecutorService executorService = Executors
101 .newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
102
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200103 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
104
wu5f6c5b82017-08-04 16:45:19 +0800105 private final String appName;
wu5f6c5b82017-08-04 16:45:19 +0800106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected TopologyService topologyService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected DeviceService deviceService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 private HostService hostService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 private FlowRuleService flowRuleService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 private ApplicationAdminService appService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 private CoreService coreService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 private PiPipeconfService piPipeconfService;
127
128 private boolean appActive = false;
129 private boolean appFreezed = false;
130
131 private boolean otherAppFound = false;
132 private AbstractUpgradableFabricApp otherApp;
133
134 private boolean flowRuleGenerated = false;
135 private ApplicationId appId;
136
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200137 private PiPipeconf appPipeconf;
wu5f6c5b82017-08-04 16:45:19 +0800138
139 private Set<DeviceId> leafSwitches;
140 private Set<DeviceId> spineSwitches;
141
142 private Map<DeviceId, List<FlowRule>> deviceFlowRules;
143 private Map<DeviceId, Boolean> pipeconfFlags;
144 private Map<DeviceId, Boolean> ruleFlags;
145
146 private ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
147
148 /**
149 * Creates a new PI fabric app.
150 *
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200151 * @param appName app name
152 * @param appPipeconf a P4Runtime device context to be used on devices
wu5f6c5b82017-08-04 16:45:19 +0800153 */
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200154 protected AbstractUpgradableFabricApp(String appName, PiPipeconf appPipeconf) {
wu5f6c5b82017-08-04 16:45:19 +0800155 this.appName = checkNotNull(appName);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200156 this.appPipeconf = checkNotNull(appPipeconf);
wu5f6c5b82017-08-04 16:45:19 +0800157 }
158
159 @Activate
160 public void activate() {
161 log.info("Starting...");
162
163 appActive = true;
164 appFreezed = false;
165
166 if (APP_HANDLES.size() > 0) {
167 if (APP_HANDLES.size() > 1) {
168 throw new IllegalStateException("Found more than 1 active app handles");
169 }
170 otherAppFound = true;
171 otherApp = APP_HANDLES.values().iterator().next();
172 log.info("Found other fabric app active, signaling to freeze to {}...", otherApp.appName);
173 otherApp.setAppFreezed(true);
174 }
175
176 APP_HANDLES.put(appName, this);
177
178 appId = coreService.registerApplication(appName);
wu5f6c5b82017-08-04 16:45:19 +0800179 deviceService.addListener(deviceListener);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200180 piPipeconfService.register(appPipeconf);
wu5f6c5b82017-08-04 16:45:19 +0800181
182 init();
183
184 log.info("STARTED", appId.id());
185 }
186
187 @Deactivate
188 public void deactivate() {
189 log.info("Stopping...");
190 try {
191 executorService.shutdown();
192 executorService.awaitTermination(5, TimeUnit.SECONDS);
193 } catch (InterruptedException e) {
194 List<Runnable> runningTasks = executorService.shutdownNow();
195 log.warn("Unable to stop the following tasks: {}", runningTasks);
196 }
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200197 scheduledExecutorService.shutdown();
wu5f6c5b82017-08-04 16:45:19 +0800198 deviceService.removeListener(deviceListener);
wu5f6c5b82017-08-04 16:45:19 +0800199 flowRuleService.removeFlowRulesById(appId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200200 piPipeconfService.remove(appPipeconf.id());
wu5f6c5b82017-08-04 16:45:19 +0800201
202 appActive = false;
203 APP_HANDLES.remove(appName);
204
205 log.info("STOPPED");
206 }
207
208 private void init() {
209
210 // Reset any previous state
211 synchronized (this) {
212 flowRuleGenerated = Boolean.FALSE;
213 leafSwitches = Sets.newHashSet();
214 spineSwitches = Sets.newHashSet();
215 deviceFlowRules = Maps.newConcurrentMap();
216 ruleFlags = Maps.newConcurrentMap();
217 pipeconfFlags = Maps.newConcurrentMap();
218 }
219
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200220 /*
221 Schedules a thread that periodically checks the topology, as soon as it corresponds to the expected
222 one, it generates the necessary flow rules and starts the deploy process on each device.
223 */
224 scheduledExecutorService.scheduleAtFixedRate(this::checkTopologyAndGenerateFlowRules,
225 0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
wu5f6c5b82017-08-04 16:45:19 +0800226 }
227
228 private void setAppFreezed(boolean appFreezed) {
229 this.appFreezed = appFreezed;
230 if (appFreezed) {
231 log.info("Freezing...");
232 } else {
233 log.info("Unfreezing...!");
234 }
235 }
236
237 /**
238 * Perform device initialization. Returns true if the operation was successful, false otherwise.
239 *
240 * @param deviceId a device id
241 * @return a boolean value
242 */
243 public abstract boolean initDevice(DeviceId deviceId);
244
245 /**
246 * Generates a list of flow rules for the given leaf switch, source host, destination hosts, spine switches and
247 * topology.
248 *
249 * @param leaf a leaf device id
250 * @param srcHost a source host
251 * @param dstHosts a collection of destination hosts
252 * @param spines a collection of spine device IDs
253 * @param topology a topology
254 * @return a list of flow rules
255 * @throws FlowRuleGeneratorException if flow rules cannot be generated
256 */
257 public abstract List<FlowRule> generateLeafRules(DeviceId leaf, Host srcHost, Collection<Host> dstHosts,
258 Collection<DeviceId> spines, Topology topology)
259 throws FlowRuleGeneratorException;
260
261 /**
262 * Generates a list of flow rules for the given spine switch, destination hosts and topology.
263 *
264 * @param deviceId a spine device id
265 * @param dstHosts a collection of destination hosts
266 * @param topology a topology
267 * @return a list of flow rules
268 * @throws FlowRuleGeneratorException if flow rules cannot be generated
269 */
270 public abstract List<FlowRule> generateSpineRules(DeviceId deviceId, Collection<Host> dstHosts, Topology topology)
271 throws FlowRuleGeneratorException;
272
273 private void deployAllDevices() {
274 if (otherAppFound && otherApp.appActive) {
275 log.info("Deactivating other app...");
276 appService.deactivate(otherApp.appId);
277 try {
278 Thread.sleep(CLEANUP_SLEEP);
279 } catch (InterruptedException e) {
280 log.warn("Cleanup sleep interrupted!");
281 Thread.interrupted();
282 }
283 }
284
285 Stream.concat(leafSwitches.stream(), spineSwitches.stream())
286 .map(deviceService::getDevice)
287 .forEach(device -> spawnTask(() -> deployDevice(device)));
288 }
289
290 /**
291 * Executes a device deploy.
292 *
293 * @param device a device
294 */
295 public void deployDevice(Device device) {
296
297 DeviceId deviceId = device.id();
298
299 // Synchronize executions over the same device.
300 Lock lock = deviceLocks.computeIfAbsent(deviceId, k -> new ReentrantLock());
301 lock.lock();
302
303 try {
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200304 // Set pipeconf flag if not already done.
wu5f6c5b82017-08-04 16:45:19 +0800305 if (!pipeconfFlags.getOrDefault(deviceId, false)) {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200306 if (piPipeconfService.ofDevice(deviceId).isPresent() &&
307 appPipeconf.id().equals(piPipeconfService.ofDevice(deviceId).get())) {
wu5f6c5b82017-08-04 16:45:19 +0800308 pipeconfFlags.put(device.id(), true);
309 } else {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200310 log.warn("Wrong pipeconf for {}, expecting {}, but found {}, aborting deploy",
311 deviceId, appPipeconf.id(), piPipeconfService.ofDevice(deviceId).get());
312 return;
wu5f6c5b82017-08-04 16:45:19 +0800313 }
314 }
315
316 // Initialize device.
317 if (!initDevice(deviceId)) {
318 log.warn("Failed to initialize device {}", deviceId);
319 }
320
321 // Install rules.
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200322 if (!ruleFlags.getOrDefault(deviceId, false) &&
323 deviceFlowRules.containsKey(deviceId)) {
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200324 log.info("Installing {} rules for {}...", deviceFlowRules.get(deviceId).size(), deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200325 installFlowRules(deviceFlowRules.get(deviceId));
326 ruleFlags.put(deviceId, true);
wu5f6c5b82017-08-04 16:45:19 +0800327 }
328 } finally {
329 lock.unlock();
330 }
331 }
332
333 private void spawnTask(Runnable task) {
334 executorService.execute(task);
335 }
336
337
338 private void installFlowRules(Collection<FlowRule> rules) {
339 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
340 rules.forEach(opsBuilder::add);
341 flowRuleService.apply(opsBuilder.build());
342 }
343
wu5f6c5b82017-08-04 16:45:19 +0800344 /**
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200345 * Generates flow rules to provide host-to-host connectivity for the given topology and hosts.
wu5f6c5b82017-08-04 16:45:19 +0800346 */
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200347 private synchronized void checkTopologyAndGenerateFlowRules() {
348
349 Topology topo = topologyService.currentTopology();
350 Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
wu5f6c5b82017-08-04 16:45:19 +0800351
352 if (flowRuleGenerated) {
353 log.debug("Flow rules have been already generated, aborting...");
354 return;
355 }
356
357 log.debug("Starting flow rules generator...");
358
359 TopologyGraph graph = topologyService.getGraph(topo);
360 Set<DeviceId> spines = Sets.newHashSet();
361 Set<DeviceId> leafs = Sets.newHashSet();
362 graph.getVertexes().stream()
363 .map(TopologyVertex::deviceId)
364 .forEach(did -> (isSpine(did, topo) ? spines : leafs).add(did));
365
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200366 if (spines.size() != TOPO_SIZE || leafs.size() != TOPO_SIZE) {
wu5f6c5b82017-08-04 16:45:19 +0800367 log.info("Invalid leaf/spine switches count, aborting... > leafCount={}, spineCount={}",
368 spines.size(), leafs.size());
369 return;
370 }
371
372 for (DeviceId did : spines) {
373 int portCount = deviceService.getPorts(did).size();
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200374 // Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200375 if (portCount != HASHED_LINKS) {
wu5f6c5b82017-08-04 16:45:19 +0800376 log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}", did, portCount);
377 return;
378 }
379 }
380 for (DeviceId did : leafs) {
381 int portCount = deviceService.getPorts(did).size();
382 // Expected port count: num spines + host port + 1 redundant spine link
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200383 if (portCount != HASHED_LINKS + 1) {
wu5f6c5b82017-08-04 16:45:19 +0800384 log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}", did, portCount);
385 return;
386 }
387 }
388
389 // Check hosts, number and exactly one per leaf
390 Map<DeviceId, Host> hostMap = Maps.newHashMap();
391 hosts.forEach(h -> hostMap.put(h.location().deviceId(), h));
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200392 if (hosts.size() != TOPO_SIZE || !leafs.equals(hostMap.keySet())) {
wu5f6c5b82017-08-04 16:45:19 +0800393 log.info("Wrong host configuration, aborting... > hostCount={}, hostMapz={}", hosts.size(), hostMap);
394 return;
395 }
396
397 List<FlowRule> newFlowRules = Lists.newArrayList();
398
399 try {
400 for (DeviceId deviceId : leafs) {
401 Host srcHost = hostMap.get(deviceId);
402 Set<Host> dstHosts = hosts.stream().filter(h -> h != srcHost).collect(toSet());
403 newFlowRules.addAll(generateLeafRules(deviceId, srcHost, dstHosts, spines, topo));
404 }
405 for (DeviceId deviceId : spines) {
406 newFlowRules.addAll(generateSpineRules(deviceId, hosts, topo));
407 }
408 } catch (FlowRuleGeneratorException e) {
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200409 log.warn("Exception while executing flow rule generator: {}", e.getMessage());
wu5f6c5b82017-08-04 16:45:19 +0800410 return;
411 }
412
413 if (newFlowRules.size() == 0) {
414 // Something went wrong
415 log.error("0 flow rules generated, BUG?");
416 return;
417 }
418
419 // All good!
420 // Divide flow rules per device id...
421 ImmutableMap.Builder<DeviceId, List<FlowRule>> mapBuilder = ImmutableMap.builder();
422 concat(spines.stream(), leafs.stream())
423 .map(deviceId -> ImmutableList.copyOf(newFlowRules
424 .stream()
425 .filter(fr -> fr.deviceId().equals(deviceId))
426 .iterator()))
427 .forEach(frs -> mapBuilder.put(frs.get(0).deviceId(), frs));
428 this.deviceFlowRules = mapBuilder.build();
429
430 this.leafSwitches = ImmutableSet.copyOf(leafs);
431 this.spineSwitches = ImmutableSet.copyOf(spines);
432
433 // Avoid other executions to modify the generated flow rules.
434 flowRuleGenerated = true;
435
436 log.info("Generated {} flow rules for {} devices", newFlowRules.size(), spines.size() + leafs.size());
437
438 spawnTask(this::deployAllDevices);
439 }
440
441 /**
442 * Returns a new, pre-configured flow rule builder.
443 *
444 * @param did a device id
445 * @param tableName a table name
446 * @return a new flow rule builder
447 */
448 protected FlowRule.Builder flowRuleBuilder(DeviceId did, String tableName) throws FlowRuleGeneratorException {
449
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200450 final Device device = deviceService.getDevice(did);
451 if (!device.is(PiPipelineInterpreter.class)) {
452 throw new FlowRuleGeneratorException(format("Device %s has no PiPipelineInterpreter", did));
wu5f6c5b82017-08-04 16:45:19 +0800453 }
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200454 final PiPipelineInterpreter interpreter = device.as(PiPipelineInterpreter.class);
455 final int flowRuleTableId;
wu5f6c5b82017-08-04 16:45:19 +0800456 if (interpreter.mapPiTableId(PiTableId.of(tableName)).isPresent()) {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200457 flowRuleTableId = interpreter.mapPiTableId(PiTableId.of(tableName)).get();
wu5f6c5b82017-08-04 16:45:19 +0800458 } else {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200459 throw new FlowRuleGeneratorException(format("Unknown table %s in interpreter", tableName));
wu5f6c5b82017-08-04 16:45:19 +0800460 }
461
462 return DefaultFlowRule.builder()
463 .forDevice(did)
464 .forTable(flowRuleTableId)
465 .fromApp(appId)
466 .withPriority(FLOW_PRIORITY)
467 .makePermanent();
468 }
469
470 private List<Port> getHostPorts(DeviceId deviceId, Topology topology) {
471 // Get all non-fabric ports.
472 return deviceService
473 .getPorts(deviceId)
474 .stream()
475 .filter(p -> !isFabricPort(p, topology))
476 .collect(Collectors.toList());
477 }
478
479 private boolean isSpine(DeviceId deviceId, Topology topology) {
480 // True if all ports are fabric.
481 return getHostPorts(deviceId, topology).size() == 0;
482 }
483
484 protected boolean isFabricPort(Port port, Topology topology) {
485 // True if the port connects this device to another infrastructure device.
486 return topologyService.isInfrastructure(topology, new ConnectPoint(port.element().id(), port.number()));
487 }
488
489 /**
wu5f6c5b82017-08-04 16:45:19 +0800490 * A listener of device events that executes a device deploy task each time a device is added, updated or
491 * re-connects.
492 */
493 private class InternalDeviceListener implements DeviceListener {
494 @Override
495 public void event(DeviceEvent event) {
496 spawnTask(() -> deployDevice(event.subject()));
497 }
498
499 @Override
500 public boolean isRelevant(DeviceEvent event) {
501 return !appFreezed &&
502 (event.type() == DEVICE_ADDED ||
503 event.type() == DEVICE_UPDATED ||
504 (event.type() == DEVICE_AVAILABILITY_CHANGED &&
505 deviceService.isAvailable(event.subject().id())));
506 }
507 }
508
509 /**
wu5f6c5b82017-08-04 16:45:19 +0800510 * An exception occurred while generating flow rules for this fabric.
511 */
512 public class FlowRuleGeneratorException extends Exception {
513
514 public FlowRuleGeneratorException() {
515 }
516
517 public FlowRuleGeneratorException(String msg) {
518 super(msg);
519 }
wu5f6c5b82017-08-04 16:45:19 +0800520 }
521}