blob: bd12b135e839ec858a5eb092b6d0b5500dd61a17 [file] [log] [blame]
wu5f6c5b82017-08-04 16:45:19 +08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pi.demo.app.common;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableMap;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.onosproject.app.ApplicationAdminService;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Host;
37import org.onosproject.net.Port;
38import org.onosproject.net.device.DeviceEvent;
39import org.onosproject.net.device.DeviceListener;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.DefaultFlowRule;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleOperations;
44import org.onosproject.net.flow.FlowRuleService;
Carmelo Cascone5167f322017-11-21 21:58:50 -080045import org.onosproject.net.group.GroupService;
wu5f6c5b82017-08-04 16:45:19 +080046import org.onosproject.net.host.HostService;
47import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone6e854042017-09-11 21:37:53 +020048import org.onosproject.net.pi.model.PiPipeconfId;
wu5f6c5b82017-08-04 16:45:19 +080049import org.onosproject.net.pi.model.PiPipelineInterpreter;
Carmelo Cascone87892e22017-11-13 16:01:29 -080050import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080051import org.onosproject.net.pi.service.PiPipeconfService;
wu5f6c5b82017-08-04 16:45:19 +080052import org.onosproject.net.topology.Topology;
wu5f6c5b82017-08-04 16:45:19 +080053import org.onosproject.net.topology.TopologyGraph;
wu5f6c5b82017-08-04 16:45:19 +080054import org.onosproject.net.topology.TopologyService;
55import org.onosproject.net.topology.TopologyVertex;
56import org.slf4j.Logger;
57
Carmelo Cascone5167f322017-11-21 21:58:50 -080058import java.util.Arrays;
wu5f6c5b82017-08-04 16:45:19 +080059import java.util.Collection;
wu5f6c5b82017-08-04 16:45:19 +080060import java.util.List;
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ConcurrentMap;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
Carmelo Cascone3929cc82017-09-06 13:34:25 +020066import java.util.concurrent.ScheduledExecutorService;
wu5f6c5b82017-08-04 16:45:19 +080067import java.util.concurrent.TimeUnit;
68import java.util.concurrent.locks.Lock;
69import java.util.concurrent.locks.ReentrantLock;
70import java.util.stream.Collectors;
71import java.util.stream.Stream;
72
Carmelo Cascone6e854042017-09-11 21:37:53 +020073import static com.google.common.base.Preconditions.checkArgument;
wu5f6c5b82017-08-04 16:45:19 +080074import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +020075import static java.lang.String.format;
wu5f6c5b82017-08-04 16:45:19 +080076import static java.util.stream.Collectors.toSet;
77import static java.util.stream.Stream.concat;
78import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconeca94bcf2017-10-27 14:16:59 -070079import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
80import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
81import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
wu5f6c5b82017-08-04 16:45:19 +080082import static org.slf4j.LoggerFactory.getLogger;
83
84/**
Carmelo Cascone5167f322017-11-21 21:58:50 -080085 * Abstract implementation of an app providing fabric connectivity for a 2-stage
86 * Clos topology of P4Runtime devices.
wu5f6c5b82017-08-04 16:45:19 +080087 */
88@Component(immediate = true)
89public abstract class AbstractUpgradableFabricApp {
90
Carmelo Cascone5167f322017-11-21 21:58:50 -080091 private static final Map<String, AbstractUpgradableFabricApp>
92 APP_HANDLES = Maps.newConcurrentMap();
wu5f6c5b82017-08-04 16:45:19 +080093
Carmelo Cascone3929cc82017-09-06 13:34:25 +020094 // TOPO_SIZE should be the same of the --size argument when running bmv2-demo.py
95 private static final int TOPO_SIZE = 2;
96 private static final boolean WITH_IMBALANCED_STRIPING = false;
Carmelo Cascone5167f322017-11-21 21:58:50 -080097 private static final int HASHED_LINKS = TOPO_SIZE + (WITH_IMBALANCED_STRIPING ? 1 : 0);
Carmelo Cascone3929cc82017-09-06 13:34:25 +020098
wu5f6c5b82017-08-04 16:45:19 +080099 private static final int FLOW_PRIORITY = 100;
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200100 private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
wu5f6c5b82017-08-04 16:45:19 +0800101
102 private static final int CLEANUP_SLEEP = 2000;
103
104 protected final Logger log = getLogger(getClass());
105
wu5f6c5b82017-08-04 16:45:19 +0800106 private final DeviceListener deviceListener = new InternalDeviceListener();
wu5f6c5b82017-08-04 16:45:19 +0800107
108 private final ExecutorService executorService = Executors
109 .newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
110
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200111 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
112
wu5f6c5b82017-08-04 16:45:19 +0800113 private final String appName;
wu5f6c5b82017-08-04 16:45:19 +0800114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected TopologyService topologyService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected DeviceService deviceService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 private HostService hostService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 private FlowRuleService flowRuleService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone5167f322017-11-21 21:58:50 -0800128 protected GroupService groupService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
wu5f6c5b82017-08-04 16:45:19 +0800131 private ApplicationAdminService appService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 private CoreService coreService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 private PiPipeconfService piPipeconfService;
138
139 private boolean appActive = false;
140 private boolean appFreezed = false;
141
142 private boolean otherAppFound = false;
143 private AbstractUpgradableFabricApp otherApp;
144
145 private boolean flowRuleGenerated = false;
Carmelo Cascone5167f322017-11-21 21:58:50 -0800146 protected ApplicationId appId;
wu5f6c5b82017-08-04 16:45:19 +0800147
Carmelo Cascone6e854042017-09-11 21:37:53 +0200148 private Collection<PiPipeconf> appPipeconfs;
wu5f6c5b82017-08-04 16:45:19 +0800149
150 private Set<DeviceId> leafSwitches;
151 private Set<DeviceId> spineSwitches;
152
153 private Map<DeviceId, List<FlowRule>> deviceFlowRules;
154 private Map<DeviceId, Boolean> pipeconfFlags;
155 private Map<DeviceId, Boolean> ruleFlags;
156
157 private ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
158
159 /**
160 * Creates a new PI fabric app.
161 *
Carmelo Casconeca94bcf2017-10-27 14:16:59 -0700162 * @param appName app name
Carmelo Cascone6e854042017-09-11 21:37:53 +0200163 * @param appPipeconfs collection of compatible pipeconfs
wu5f6c5b82017-08-04 16:45:19 +0800164 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800165 protected AbstractUpgradableFabricApp(String appName,
166 Collection<PiPipeconf> appPipeconfs) {
wu5f6c5b82017-08-04 16:45:19 +0800167 this.appName = checkNotNull(appName);
Carmelo Cascone6e854042017-09-11 21:37:53 +0200168 this.appPipeconfs = checkNotNull(appPipeconfs);
169 checkArgument(appPipeconfs.size() > 0, "appPipeconfs cannot have size 0");
wu5f6c5b82017-08-04 16:45:19 +0800170 }
171
172 @Activate
173 public void activate() {
174 log.info("Starting...");
175
176 appActive = true;
177 appFreezed = false;
178
179 if (APP_HANDLES.size() > 0) {
180 if (APP_HANDLES.size() > 1) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800181 throw new IllegalStateException(
182 "Found more than 1 active app handles");
wu5f6c5b82017-08-04 16:45:19 +0800183 }
184 otherAppFound = true;
185 otherApp = APP_HANDLES.values().iterator().next();
Carmelo Cascone5167f322017-11-21 21:58:50 -0800186 log.info("Found other fabric app active, signaling to freeze to {}...",
187 otherApp.appName);
wu5f6c5b82017-08-04 16:45:19 +0800188 otherApp.setAppFreezed(true);
189 }
190
191 APP_HANDLES.put(appName, this);
192
193 appId = coreService.registerApplication(appName);
wu5f6c5b82017-08-04 16:45:19 +0800194 deviceService.addListener(deviceListener);
wu5f6c5b82017-08-04 16:45:19 +0800195
196 init();
197
198 log.info("STARTED", appId.id());
199 }
200
201 @Deactivate
202 public void deactivate() {
203 log.info("Stopping...");
204 try {
205 executorService.shutdown();
206 executorService.awaitTermination(5, TimeUnit.SECONDS);
207 } catch (InterruptedException e) {
208 List<Runnable> runningTasks = executorService.shutdownNow();
209 log.warn("Unable to stop the following tasks: {}", runningTasks);
Ray Milkey5c7d4882018-02-05 14:50:39 -0800210 Thread.currentThread().interrupt();
wu5f6c5b82017-08-04 16:45:19 +0800211 }
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200212 scheduledExecutorService.shutdown();
wu5f6c5b82017-08-04 16:45:19 +0800213 deviceService.removeListener(deviceListener);
wu5f6c5b82017-08-04 16:45:19 +0800214 flowRuleService.removeFlowRulesById(appId);
wu5f6c5b82017-08-04 16:45:19 +0800215
216 appActive = false;
217 APP_HANDLES.remove(appName);
218
219 log.info("STOPPED");
220 }
221
222 private void init() {
223
224 // Reset any previous state
225 synchronized (this) {
226 flowRuleGenerated = Boolean.FALSE;
227 leafSwitches = Sets.newHashSet();
228 spineSwitches = Sets.newHashSet();
229 deviceFlowRules = Maps.newConcurrentMap();
230 ruleFlags = Maps.newConcurrentMap();
231 pipeconfFlags = Maps.newConcurrentMap();
232 }
233
Carmelo Cascone5167f322017-11-21 21:58:50 -0800234 // Schedules a thread that periodically checks the topology, as soon as
235 // it corresponds to the expected one, it generates the necessary flow
236 // rules and starts the deploy process on each device.
237 scheduledExecutorService.scheduleAtFixedRate(
238 this::checkTopologyAndGenerateFlowRules,
239 0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
wu5f6c5b82017-08-04 16:45:19 +0800240 }
241
242 private void setAppFreezed(boolean appFreezed) {
243 this.appFreezed = appFreezed;
244 if (appFreezed) {
245 log.info("Freezing...");
246 } else {
247 log.info("Unfreezing...!");
248 }
249 }
250
251 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800252 * Perform device initialization. Returns true if the operation was
253 * successful, false otherwise.
wu5f6c5b82017-08-04 16:45:19 +0800254 *
255 * @param deviceId a device id
256 * @return a boolean value
257 */
258 public abstract boolean initDevice(DeviceId deviceId);
259
260 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800261 * Generates a list of flow rules for the given leaf switch, source host,
262 * destination hosts, spine switches and topology.
wu5f6c5b82017-08-04 16:45:19 +0800263 *
264 * @param leaf a leaf device id
265 * @param srcHost a source host
266 * @param dstHosts a collection of destination hosts
267 * @param spines a collection of spine device IDs
268 * @param topology a topology
269 * @return a list of flow rules
270 * @throws FlowRuleGeneratorException if flow rules cannot be generated
271 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800272 public abstract List<FlowRule> generateLeafRules(DeviceId leaf, Host srcHost,
273 Set<Host> dstHosts,
274 Set<DeviceId> spines,
275 Topology topology)
wu5f6c5b82017-08-04 16:45:19 +0800276 throws FlowRuleGeneratorException;
277
278 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800279 * Generates a list of flow rules for the given spine switch, destination
280 * hosts and topology.
wu5f6c5b82017-08-04 16:45:19 +0800281 *
282 * @param deviceId a spine device id
283 * @param dstHosts a collection of destination hosts
284 * @param topology a topology
285 * @return a list of flow rules
286 * @throws FlowRuleGeneratorException if flow rules cannot be generated
287 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800288 public abstract List<FlowRule> generateSpineRules(DeviceId deviceId,
289 Set<Host> dstHosts,
290 Topology topology)
wu5f6c5b82017-08-04 16:45:19 +0800291 throws FlowRuleGeneratorException;
292
293 private void deployAllDevices() {
294 if (otherAppFound && otherApp.appActive) {
295 log.info("Deactivating other app...");
296 appService.deactivate(otherApp.appId);
297 try {
298 Thread.sleep(CLEANUP_SLEEP);
299 } catch (InterruptedException e) {
300 log.warn("Cleanup sleep interrupted!");
Ray Milkey5c7d4882018-02-05 14:50:39 -0800301 Thread.currentThread().interrupt();
wu5f6c5b82017-08-04 16:45:19 +0800302 }
303 }
304
305 Stream.concat(leafSwitches.stream(), spineSwitches.stream())
306 .map(deviceService::getDevice)
307 .forEach(device -> spawnTask(() -> deployDevice(device)));
308 }
309
Carmelo Cascone6e854042017-09-11 21:37:53 +0200310 private boolean matchPipeconf(PiPipeconfId piPipeconfId) {
311 return appPipeconfs.stream()
312 .anyMatch(p -> p.id().equals(piPipeconfId));
313 }
314
wu5f6c5b82017-08-04 16:45:19 +0800315 /**
316 * Executes a device deploy.
317 *
318 * @param device a device
319 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800320 private void deployDevice(Device device) {
wu5f6c5b82017-08-04 16:45:19 +0800321
322 DeviceId deviceId = device.id();
323
324 // Synchronize executions over the same device.
325 Lock lock = deviceLocks.computeIfAbsent(deviceId, k -> new ReentrantLock());
326 lock.lock();
327
328 try {
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200329 // Set pipeconf flag if not already done.
wu5f6c5b82017-08-04 16:45:19 +0800330 if (!pipeconfFlags.getOrDefault(deviceId, false)) {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200331 if (piPipeconfService.ofDevice(deviceId).isPresent() &&
Carmelo Cascone6e854042017-09-11 21:37:53 +0200332 matchPipeconf(piPipeconfService.ofDevice(deviceId).get())) {
wu5f6c5b82017-08-04 16:45:19 +0800333 pipeconfFlags.put(device.id(), true);
334 } else {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200335 log.warn("Wrong pipeconf for {}, expecting {}, but found {}, aborting deploy",
Carmelo Cascone5167f322017-11-21 21:58:50 -0800336 deviceId, Arrays.toString(appPipeconfs.toArray()),
Carmelo Cascone6e854042017-09-11 21:37:53 +0200337 piPipeconfService.ofDevice(deviceId).get());
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200338 return;
wu5f6c5b82017-08-04 16:45:19 +0800339 }
340 }
341
342 // Initialize device.
343 if (!initDevice(deviceId)) {
344 log.warn("Failed to initialize device {}", deviceId);
345 }
346
347 // Install rules.
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200348 if (!ruleFlags.getOrDefault(deviceId, false) &&
349 deviceFlowRules.containsKey(deviceId)) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800350 log.info("Installing {} rules for {}...",
351 deviceFlowRules.get(deviceId).size(), deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200352 installFlowRules(deviceFlowRules.get(deviceId));
353 ruleFlags.put(deviceId, true);
wu5f6c5b82017-08-04 16:45:19 +0800354 }
355 } finally {
356 lock.unlock();
357 }
358 }
359
360 private void spawnTask(Runnable task) {
361 executorService.execute(task);
362 }
363
364
365 private void installFlowRules(Collection<FlowRule> rules) {
366 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
367 rules.forEach(opsBuilder::add);
368 flowRuleService.apply(opsBuilder.build());
369 }
370
wu5f6c5b82017-08-04 16:45:19 +0800371 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800372 * Generates flow rules to provide host-to-host connectivity for the given
373 * topology and hosts.
wu5f6c5b82017-08-04 16:45:19 +0800374 */
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200375 private synchronized void checkTopologyAndGenerateFlowRules() {
376
377 Topology topo = topologyService.currentTopology();
378 Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
wu5f6c5b82017-08-04 16:45:19 +0800379
380 if (flowRuleGenerated) {
381 log.debug("Flow rules have been already generated, aborting...");
382 return;
383 }
384
385 log.debug("Starting flow rules generator...");
386
387 TopologyGraph graph = topologyService.getGraph(topo);
388 Set<DeviceId> spines = Sets.newHashSet();
389 Set<DeviceId> leafs = Sets.newHashSet();
390 graph.getVertexes().stream()
391 .map(TopologyVertex::deviceId)
392 .forEach(did -> (isSpine(did, topo) ? spines : leafs).add(did));
393
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200394 if (spines.size() != TOPO_SIZE || leafs.size() != TOPO_SIZE) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800395 log.info("Invalid leaf/spine count, aborting... > leafCount={}, spineCount={}",
wu5f6c5b82017-08-04 16:45:19 +0800396 spines.size(), leafs.size());
397 return;
398 }
399
400 for (DeviceId did : spines) {
401 int portCount = deviceService.getPorts(did).size();
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200402 // Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200403 if (portCount != HASHED_LINKS) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800404 log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}",
405 did, portCount);
wu5f6c5b82017-08-04 16:45:19 +0800406 return;
407 }
408 }
409 for (DeviceId did : leafs) {
410 int portCount = deviceService.getPorts(did).size();
411 // Expected port count: num spines + host port + 1 redundant spine link
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200412 if (portCount != HASHED_LINKS + 1) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800413 log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}",
414 did, portCount);
wu5f6c5b82017-08-04 16:45:19 +0800415 return;
416 }
417 }
418
419 // Check hosts, number and exactly one per leaf
420 Map<DeviceId, Host> hostMap = Maps.newHashMap();
421 hosts.forEach(h -> hostMap.put(h.location().deviceId(), h));
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200422 if (hosts.size() != TOPO_SIZE || !leafs.equals(hostMap.keySet())) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800423 log.info("Wrong host configuration, aborting... > hostCount={}, hostMapz={}",
424 hosts.size(), hostMap);
wu5f6c5b82017-08-04 16:45:19 +0800425 return;
426 }
427
428 List<FlowRule> newFlowRules = Lists.newArrayList();
429
430 try {
431 for (DeviceId deviceId : leafs) {
432 Host srcHost = hostMap.get(deviceId);
Carmelo Cascone5167f322017-11-21 21:58:50 -0800433 Set<Host> dstHosts = hosts.stream()
434 .filter(h -> h != srcHost)
435 .collect(toSet());
436 newFlowRules.addAll(generateLeafRules(deviceId, srcHost,
437 dstHosts, spines, topo));
wu5f6c5b82017-08-04 16:45:19 +0800438 }
439 for (DeviceId deviceId : spines) {
440 newFlowRules.addAll(generateSpineRules(deviceId, hosts, topo));
441 }
442 } catch (FlowRuleGeneratorException e) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800443 log.warn("Exception while executing flow rule generator: {}",
444 e.getMessage());
wu5f6c5b82017-08-04 16:45:19 +0800445 return;
446 }
447
448 if (newFlowRules.size() == 0) {
449 // Something went wrong
450 log.error("0 flow rules generated, BUG?");
451 return;
452 }
453
454 // All good!
455 // Divide flow rules per device id...
Carmelo Cascone5167f322017-11-21 21:58:50 -0800456 ImmutableMap.Builder<DeviceId, List<FlowRule>> mapBuilder =
457 ImmutableMap.builder();
wu5f6c5b82017-08-04 16:45:19 +0800458 concat(spines.stream(), leafs.stream())
Carmelo Cascone5167f322017-11-21 21:58:50 -0800459 .map(deviceId -> ImmutableList.copyOf(
460 newFlowRules.stream()
461 .filter(fr -> fr.deviceId().equals(deviceId))
462 .iterator()))
wu5f6c5b82017-08-04 16:45:19 +0800463 .forEach(frs -> mapBuilder.put(frs.get(0).deviceId(), frs));
464 this.deviceFlowRules = mapBuilder.build();
465
466 this.leafSwitches = ImmutableSet.copyOf(leafs);
467 this.spineSwitches = ImmutableSet.copyOf(spines);
468
469 // Avoid other executions to modify the generated flow rules.
470 flowRuleGenerated = true;
471
Carmelo Cascone5167f322017-11-21 21:58:50 -0800472 log.info("Generated {} flow rules for {} devices",
473 newFlowRules.size(), spines.size() + leafs.size());
wu5f6c5b82017-08-04 16:45:19 +0800474
475 spawnTask(this::deployAllDevices);
476 }
477
478 /**
479 * Returns a new, pre-configured flow rule builder.
480 *
Carmelo Casconeca94bcf2017-10-27 14:16:59 -0700481 * @param did a device id
482 * @param tableId a table id
wu5f6c5b82017-08-04 16:45:19 +0800483 * @return a new flow rule builder
Thomas Vachuskaa01ef782018-07-25 14:07:11 -0700484 * @throws FlowRuleGeneratorException if device has no pipeline interpreter
wu5f6c5b82017-08-04 16:45:19 +0800485 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800486 protected FlowRule.Builder flowRuleBuilder(DeviceId did, PiTableId tableId)
487 throws FlowRuleGeneratorException {
wu5f6c5b82017-08-04 16:45:19 +0800488
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200489 final Device device = deviceService.getDevice(did);
490 if (!device.is(PiPipelineInterpreter.class)) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800491 throw new FlowRuleGeneratorException(format(
492 "Device %s has no PiPipelineInterpreter", did));
wu5f6c5b82017-08-04 16:45:19 +0800493 }
wu5f6c5b82017-08-04 16:45:19 +0800494
495 return DefaultFlowRule.builder()
496 .forDevice(did)
Carmelo Cascone03f343d2017-11-13 16:54:39 -0800497 .forTable(tableId)
wu5f6c5b82017-08-04 16:45:19 +0800498 .fromApp(appId)
499 .withPriority(FLOW_PRIORITY)
500 .makePermanent();
501 }
502
503 private List<Port> getHostPorts(DeviceId deviceId, Topology topology) {
504 // Get all non-fabric ports.
505 return deviceService
506 .getPorts(deviceId)
507 .stream()
508 .filter(p -> !isFabricPort(p, topology))
509 .collect(Collectors.toList());
510 }
511
512 private boolean isSpine(DeviceId deviceId, Topology topology) {
513 // True if all ports are fabric.
514 return getHostPorts(deviceId, topology).size() == 0;
515 }
516
517 protected boolean isFabricPort(Port port, Topology topology) {
518 // True if the port connects this device to another infrastructure device.
Carmelo Cascone5167f322017-11-21 21:58:50 -0800519 return topologyService.isInfrastructure(
520 topology, new ConnectPoint(port.element().id(), port.number()));
wu5f6c5b82017-08-04 16:45:19 +0800521 }
522
523 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800524 * A listener of device events that executes a device deploy task each time
525 * a device is added, updated or re-connects.
wu5f6c5b82017-08-04 16:45:19 +0800526 */
527 private class InternalDeviceListener implements DeviceListener {
528 @Override
529 public void event(DeviceEvent event) {
530 spawnTask(() -> deployDevice(event.subject()));
531 }
532
533 @Override
534 public boolean isRelevant(DeviceEvent event) {
535 return !appFreezed &&
536 (event.type() == DEVICE_ADDED ||
537 event.type() == DEVICE_UPDATED ||
538 (event.type() == DEVICE_AVAILABILITY_CHANGED &&
539 deviceService.isAvailable(event.subject().id())));
540 }
541 }
542
543 /**
wu5f6c5b82017-08-04 16:45:19 +0800544 * An exception occurred while generating flow rules for this fabric.
545 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800546 public static class FlowRuleGeneratorException extends Exception {
wu5f6c5b82017-08-04 16:45:19 +0800547
548 public FlowRuleGeneratorException() {
549 }
550
Carmelo Cascone5167f322017-11-21 21:58:50 -0800551 FlowRuleGeneratorException(String msg) {
wu5f6c5b82017-08-04 16:45:19 +0800552 super(msg);
553 }
wu5f6c5b82017-08-04 16:45:19 +0800554 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800555}