blob: 7c91bf3dc0144a538cf49163771d18a75cbdb350 [file] [log] [blame]
wu5f6c5b82017-08-04 16:45:19 +08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pi.demo.app.common;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.ImmutableMap;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
23import com.google.common.collect.Maps;
24import com.google.common.collect.Sets;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.onosproject.app.ApplicationAdminService;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Host;
37import org.onosproject.net.Port;
38import org.onosproject.net.device.DeviceEvent;
39import org.onosproject.net.device.DeviceListener;
40import org.onosproject.net.device.DeviceService;
41import org.onosproject.net.flow.DefaultFlowRule;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleOperations;
44import org.onosproject.net.flow.FlowRuleService;
Carmelo Cascone5167f322017-11-21 21:58:50 -080045import org.onosproject.net.group.GroupService;
wu5f6c5b82017-08-04 16:45:19 +080046import org.onosproject.net.host.HostService;
47import org.onosproject.net.pi.model.PiPipeconf;
Carmelo Cascone6e854042017-09-11 21:37:53 +020048import org.onosproject.net.pi.model.PiPipeconfId;
wu5f6c5b82017-08-04 16:45:19 +080049import org.onosproject.net.pi.model.PiPipelineInterpreter;
Carmelo Cascone87892e22017-11-13 16:01:29 -080050import org.onosproject.net.pi.model.PiTableId;
Carmelo Cascone39c28ca2017-11-15 13:03:57 -080051import org.onosproject.net.pi.service.PiPipeconfService;
wu5f6c5b82017-08-04 16:45:19 +080052import org.onosproject.net.topology.Topology;
wu5f6c5b82017-08-04 16:45:19 +080053import org.onosproject.net.topology.TopologyGraph;
wu5f6c5b82017-08-04 16:45:19 +080054import org.onosproject.net.topology.TopologyService;
55import org.onosproject.net.topology.TopologyVertex;
56import org.slf4j.Logger;
57
Carmelo Cascone5167f322017-11-21 21:58:50 -080058import java.util.Arrays;
wu5f6c5b82017-08-04 16:45:19 +080059import java.util.Collection;
wu5f6c5b82017-08-04 16:45:19 +080060import java.util.List;
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ConcurrentMap;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
Carmelo Cascone3929cc82017-09-06 13:34:25 +020066import java.util.concurrent.ScheduledExecutorService;
wu5f6c5b82017-08-04 16:45:19 +080067import java.util.concurrent.TimeUnit;
68import java.util.concurrent.locks.Lock;
69import java.util.concurrent.locks.ReentrantLock;
70import java.util.stream.Collectors;
71import java.util.stream.Stream;
72
Carmelo Cascone6e854042017-09-11 21:37:53 +020073import static com.google.common.base.Preconditions.checkArgument;
wu5f6c5b82017-08-04 16:45:19 +080074import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Cascone1fb27d32017-08-25 20:40:20 +020075import static java.lang.String.format;
wu5f6c5b82017-08-04 16:45:19 +080076import static java.util.stream.Collectors.toSet;
77import static java.util.stream.Stream.concat;
78import static org.onlab.util.Tools.groupedThreads;
Carmelo Casconeca94bcf2017-10-27 14:16:59 -070079import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
80import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
81import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
wu5f6c5b82017-08-04 16:45:19 +080082import static org.slf4j.LoggerFactory.getLogger;
83
84/**
Carmelo Cascone5167f322017-11-21 21:58:50 -080085 * Abstract implementation of an app providing fabric connectivity for a 2-stage
86 * Clos topology of P4Runtime devices.
wu5f6c5b82017-08-04 16:45:19 +080087 */
88@Component(immediate = true)
89public abstract class AbstractUpgradableFabricApp {
90
Carmelo Cascone5167f322017-11-21 21:58:50 -080091 private static final Map<String, AbstractUpgradableFabricApp>
92 APP_HANDLES = Maps.newConcurrentMap();
wu5f6c5b82017-08-04 16:45:19 +080093
Carmelo Cascone3929cc82017-09-06 13:34:25 +020094 // TOPO_SIZE should be the same of the --size argument when running bmv2-demo.py
95 private static final int TOPO_SIZE = 2;
96 private static final boolean WITH_IMBALANCED_STRIPING = false;
Carmelo Cascone5167f322017-11-21 21:58:50 -080097 private static final int HASHED_LINKS = TOPO_SIZE + (WITH_IMBALANCED_STRIPING ? 1 : 0);
Carmelo Cascone3929cc82017-09-06 13:34:25 +020098
wu5f6c5b82017-08-04 16:45:19 +080099 private static final int FLOW_PRIORITY = 100;
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200100 private static final int CHECK_TOPOLOGY_INTERVAL_SECONDS = 5;
wu5f6c5b82017-08-04 16:45:19 +0800101
102 private static final int CLEANUP_SLEEP = 2000;
103
104 protected final Logger log = getLogger(getClass());
105
wu5f6c5b82017-08-04 16:45:19 +0800106 private final DeviceListener deviceListener = new InternalDeviceListener();
wu5f6c5b82017-08-04 16:45:19 +0800107
108 private final ExecutorService executorService = Executors
109 .newFixedThreadPool(8, groupedThreads("onos/pi-demo-app", "pi-app-task", log));
110
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200111 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
112
wu5f6c5b82017-08-04 16:45:19 +0800113 private final String appName;
wu5f6c5b82017-08-04 16:45:19 +0800114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
116 protected TopologyService topologyService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected DeviceService deviceService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 private HostService hostService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 private FlowRuleService flowRuleService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Carmelo Cascone5167f322017-11-21 21:58:50 -0800128 protected GroupService groupService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
wu5f6c5b82017-08-04 16:45:19 +0800131 private ApplicationAdminService appService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 private CoreService coreService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 private PiPipeconfService piPipeconfService;
138
139 private boolean appActive = false;
140 private boolean appFreezed = false;
141
142 private boolean otherAppFound = false;
143 private AbstractUpgradableFabricApp otherApp;
144
145 private boolean flowRuleGenerated = false;
Carmelo Cascone5167f322017-11-21 21:58:50 -0800146 protected ApplicationId appId;
wu5f6c5b82017-08-04 16:45:19 +0800147
Carmelo Cascone6e854042017-09-11 21:37:53 +0200148 private Collection<PiPipeconf> appPipeconfs;
wu5f6c5b82017-08-04 16:45:19 +0800149
150 private Set<DeviceId> leafSwitches;
151 private Set<DeviceId> spineSwitches;
152
153 private Map<DeviceId, List<FlowRule>> deviceFlowRules;
154 private Map<DeviceId, Boolean> pipeconfFlags;
155 private Map<DeviceId, Boolean> ruleFlags;
156
157 private ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
158
159 /**
160 * Creates a new PI fabric app.
161 *
Carmelo Casconeca94bcf2017-10-27 14:16:59 -0700162 * @param appName app name
Carmelo Cascone6e854042017-09-11 21:37:53 +0200163 * @param appPipeconfs collection of compatible pipeconfs
wu5f6c5b82017-08-04 16:45:19 +0800164 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800165 protected AbstractUpgradableFabricApp(String appName,
166 Collection<PiPipeconf> appPipeconfs) {
wu5f6c5b82017-08-04 16:45:19 +0800167 this.appName = checkNotNull(appName);
Carmelo Cascone6e854042017-09-11 21:37:53 +0200168 this.appPipeconfs = checkNotNull(appPipeconfs);
169 checkArgument(appPipeconfs.size() > 0, "appPipeconfs cannot have size 0");
wu5f6c5b82017-08-04 16:45:19 +0800170 }
171
172 @Activate
173 public void activate() {
174 log.info("Starting...");
175
176 appActive = true;
177 appFreezed = false;
178
179 if (APP_HANDLES.size() > 0) {
180 if (APP_HANDLES.size() > 1) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800181 throw new IllegalStateException(
182 "Found more than 1 active app handles");
wu5f6c5b82017-08-04 16:45:19 +0800183 }
184 otherAppFound = true;
185 otherApp = APP_HANDLES.values().iterator().next();
Carmelo Cascone5167f322017-11-21 21:58:50 -0800186 log.info("Found other fabric app active, signaling to freeze to {}...",
187 otherApp.appName);
wu5f6c5b82017-08-04 16:45:19 +0800188 otherApp.setAppFreezed(true);
189 }
190
191 APP_HANDLES.put(appName, this);
192
193 appId = coreService.registerApplication(appName);
wu5f6c5b82017-08-04 16:45:19 +0800194 deviceService.addListener(deviceListener);
wu5f6c5b82017-08-04 16:45:19 +0800195
196 init();
197
198 log.info("STARTED", appId.id());
199 }
200
201 @Deactivate
202 public void deactivate() {
203 log.info("Stopping...");
204 try {
205 executorService.shutdown();
206 executorService.awaitTermination(5, TimeUnit.SECONDS);
207 } catch (InterruptedException e) {
208 List<Runnable> runningTasks = executorService.shutdownNow();
209 log.warn("Unable to stop the following tasks: {}", runningTasks);
210 }
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200211 scheduledExecutorService.shutdown();
wu5f6c5b82017-08-04 16:45:19 +0800212 deviceService.removeListener(deviceListener);
wu5f6c5b82017-08-04 16:45:19 +0800213 flowRuleService.removeFlowRulesById(appId);
wu5f6c5b82017-08-04 16:45:19 +0800214
215 appActive = false;
216 APP_HANDLES.remove(appName);
217
218 log.info("STOPPED");
219 }
220
221 private void init() {
222
223 // Reset any previous state
224 synchronized (this) {
225 flowRuleGenerated = Boolean.FALSE;
226 leafSwitches = Sets.newHashSet();
227 spineSwitches = Sets.newHashSet();
228 deviceFlowRules = Maps.newConcurrentMap();
229 ruleFlags = Maps.newConcurrentMap();
230 pipeconfFlags = Maps.newConcurrentMap();
231 }
232
Carmelo Cascone5167f322017-11-21 21:58:50 -0800233 // Schedules a thread that periodically checks the topology, as soon as
234 // it corresponds to the expected one, it generates the necessary flow
235 // rules and starts the deploy process on each device.
236 scheduledExecutorService.scheduleAtFixedRate(
237 this::checkTopologyAndGenerateFlowRules,
238 0, CHECK_TOPOLOGY_INTERVAL_SECONDS, TimeUnit.SECONDS);
wu5f6c5b82017-08-04 16:45:19 +0800239 }
240
241 private void setAppFreezed(boolean appFreezed) {
242 this.appFreezed = appFreezed;
243 if (appFreezed) {
244 log.info("Freezing...");
245 } else {
246 log.info("Unfreezing...!");
247 }
248 }
249
250 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800251 * Perform device initialization. Returns true if the operation was
252 * successful, false otherwise.
wu5f6c5b82017-08-04 16:45:19 +0800253 *
254 * @param deviceId a device id
255 * @return a boolean value
256 */
257 public abstract boolean initDevice(DeviceId deviceId);
258
259 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800260 * Generates a list of flow rules for the given leaf switch, source host,
261 * destination hosts, spine switches and topology.
wu5f6c5b82017-08-04 16:45:19 +0800262 *
263 * @param leaf a leaf device id
264 * @param srcHost a source host
265 * @param dstHosts a collection of destination hosts
266 * @param spines a collection of spine device IDs
267 * @param topology a topology
268 * @return a list of flow rules
269 * @throws FlowRuleGeneratorException if flow rules cannot be generated
270 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800271 public abstract List<FlowRule> generateLeafRules(DeviceId leaf, Host srcHost,
272 Set<Host> dstHosts,
273 Set<DeviceId> spines,
274 Topology topology)
wu5f6c5b82017-08-04 16:45:19 +0800275 throws FlowRuleGeneratorException;
276
277 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800278 * Generates a list of flow rules for the given spine switch, destination
279 * hosts and topology.
wu5f6c5b82017-08-04 16:45:19 +0800280 *
281 * @param deviceId a spine device id
282 * @param dstHosts a collection of destination hosts
283 * @param topology a topology
284 * @return a list of flow rules
285 * @throws FlowRuleGeneratorException if flow rules cannot be generated
286 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800287 public abstract List<FlowRule> generateSpineRules(DeviceId deviceId,
288 Set<Host> dstHosts,
289 Topology topology)
wu5f6c5b82017-08-04 16:45:19 +0800290 throws FlowRuleGeneratorException;
291
292 private void deployAllDevices() {
293 if (otherAppFound && otherApp.appActive) {
294 log.info("Deactivating other app...");
295 appService.deactivate(otherApp.appId);
296 try {
297 Thread.sleep(CLEANUP_SLEEP);
298 } catch (InterruptedException e) {
299 log.warn("Cleanup sleep interrupted!");
300 Thread.interrupted();
301 }
302 }
303
304 Stream.concat(leafSwitches.stream(), spineSwitches.stream())
305 .map(deviceService::getDevice)
306 .forEach(device -> spawnTask(() -> deployDevice(device)));
307 }
308
Carmelo Cascone6e854042017-09-11 21:37:53 +0200309 private boolean matchPipeconf(PiPipeconfId piPipeconfId) {
310 return appPipeconfs.stream()
311 .anyMatch(p -> p.id().equals(piPipeconfId));
312 }
313
wu5f6c5b82017-08-04 16:45:19 +0800314 /**
315 * Executes a device deploy.
316 *
317 * @param device a device
318 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800319 private void deployDevice(Device device) {
wu5f6c5b82017-08-04 16:45:19 +0800320
321 DeviceId deviceId = device.id();
322
323 // Synchronize executions over the same device.
324 Lock lock = deviceLocks.computeIfAbsent(deviceId, k -> new ReentrantLock());
325 lock.lock();
326
327 try {
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200328 // Set pipeconf flag if not already done.
wu5f6c5b82017-08-04 16:45:19 +0800329 if (!pipeconfFlags.getOrDefault(deviceId, false)) {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200330 if (piPipeconfService.ofDevice(deviceId).isPresent() &&
Carmelo Cascone6e854042017-09-11 21:37:53 +0200331 matchPipeconf(piPipeconfService.ofDevice(deviceId).get())) {
wu5f6c5b82017-08-04 16:45:19 +0800332 pipeconfFlags.put(device.id(), true);
333 } else {
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200334 log.warn("Wrong pipeconf for {}, expecting {}, but found {}, aborting deploy",
Carmelo Cascone5167f322017-11-21 21:58:50 -0800335 deviceId, Arrays.toString(appPipeconfs.toArray()),
Carmelo Cascone6e854042017-09-11 21:37:53 +0200336 piPipeconfService.ofDevice(deviceId).get());
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200337 return;
wu5f6c5b82017-08-04 16:45:19 +0800338 }
339 }
340
341 // Initialize device.
342 if (!initDevice(deviceId)) {
343 log.warn("Failed to initialize device {}", deviceId);
344 }
345
346 // Install rules.
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200347 if (!ruleFlags.getOrDefault(deviceId, false) &&
348 deviceFlowRules.containsKey(deviceId)) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800349 log.info("Installing {} rules for {}...",
350 deviceFlowRules.get(deviceId).size(), deviceId);
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200351 installFlowRules(deviceFlowRules.get(deviceId));
352 ruleFlags.put(deviceId, true);
wu5f6c5b82017-08-04 16:45:19 +0800353 }
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 private void spawnTask(Runnable task) {
360 executorService.execute(task);
361 }
362
363
364 private void installFlowRules(Collection<FlowRule> rules) {
365 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
366 rules.forEach(opsBuilder::add);
367 flowRuleService.apply(opsBuilder.build());
368 }
369
wu5f6c5b82017-08-04 16:45:19 +0800370 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800371 * Generates flow rules to provide host-to-host connectivity for the given
372 * topology and hosts.
wu5f6c5b82017-08-04 16:45:19 +0800373 */
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200374 private synchronized void checkTopologyAndGenerateFlowRules() {
375
376 Topology topo = topologyService.currentTopology();
377 Set<Host> hosts = Sets.newHashSet(hostService.getHosts());
wu5f6c5b82017-08-04 16:45:19 +0800378
379 if (flowRuleGenerated) {
380 log.debug("Flow rules have been already generated, aborting...");
381 return;
382 }
383
384 log.debug("Starting flow rules generator...");
385
386 TopologyGraph graph = topologyService.getGraph(topo);
387 Set<DeviceId> spines = Sets.newHashSet();
388 Set<DeviceId> leafs = Sets.newHashSet();
389 graph.getVertexes().stream()
390 .map(TopologyVertex::deviceId)
391 .forEach(did -> (isSpine(did, topo) ? spines : leafs).add(did));
392
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200393 if (spines.size() != TOPO_SIZE || leafs.size() != TOPO_SIZE) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800394 log.info("Invalid leaf/spine count, aborting... > leafCount={}, spineCount={}",
wu5f6c5b82017-08-04 16:45:19 +0800395 spines.size(), leafs.size());
396 return;
397 }
398
399 for (DeviceId did : spines) {
400 int portCount = deviceService.getPorts(did).size();
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200401 // Expected port count: num leafs + 1 redundant leaf link (if imbalanced)
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200402 if (portCount != HASHED_LINKS) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800403 log.info("Invalid port count for spine, aborting... > deviceId={}, portCount={}",
404 did, portCount);
wu5f6c5b82017-08-04 16:45:19 +0800405 return;
406 }
407 }
408 for (DeviceId did : leafs) {
409 int portCount = deviceService.getPorts(did).size();
410 // Expected port count: num spines + host port + 1 redundant spine link
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200411 if (portCount != HASHED_LINKS + 1) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800412 log.info("Invalid port count for leaf, aborting... > deviceId={}, portCount={}",
413 did, portCount);
wu5f6c5b82017-08-04 16:45:19 +0800414 return;
415 }
416 }
417
418 // Check hosts, number and exactly one per leaf
419 Map<DeviceId, Host> hostMap = Maps.newHashMap();
420 hosts.forEach(h -> hostMap.put(h.location().deviceId(), h));
Carmelo Cascone3929cc82017-09-06 13:34:25 +0200421 if (hosts.size() != TOPO_SIZE || !leafs.equals(hostMap.keySet())) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800422 log.info("Wrong host configuration, aborting... > hostCount={}, hostMapz={}",
423 hosts.size(), hostMap);
wu5f6c5b82017-08-04 16:45:19 +0800424 return;
425 }
426
427 List<FlowRule> newFlowRules = Lists.newArrayList();
428
429 try {
430 for (DeviceId deviceId : leafs) {
431 Host srcHost = hostMap.get(deviceId);
Carmelo Cascone5167f322017-11-21 21:58:50 -0800432 Set<Host> dstHosts = hosts.stream()
433 .filter(h -> h != srcHost)
434 .collect(toSet());
435 newFlowRules.addAll(generateLeafRules(deviceId, srcHost,
436 dstHosts, spines, topo));
wu5f6c5b82017-08-04 16:45:19 +0800437 }
438 for (DeviceId deviceId : spines) {
439 newFlowRules.addAll(generateSpineRules(deviceId, hosts, topo));
440 }
441 } catch (FlowRuleGeneratorException e) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800442 log.warn("Exception while executing flow rule generator: {}",
443 e.getMessage());
wu5f6c5b82017-08-04 16:45:19 +0800444 return;
445 }
446
447 if (newFlowRules.size() == 0) {
448 // Something went wrong
449 log.error("0 flow rules generated, BUG?");
450 return;
451 }
452
453 // All good!
454 // Divide flow rules per device id...
Carmelo Cascone5167f322017-11-21 21:58:50 -0800455 ImmutableMap.Builder<DeviceId, List<FlowRule>> mapBuilder =
456 ImmutableMap.builder();
wu5f6c5b82017-08-04 16:45:19 +0800457 concat(spines.stream(), leafs.stream())
Carmelo Cascone5167f322017-11-21 21:58:50 -0800458 .map(deviceId -> ImmutableList.copyOf(
459 newFlowRules.stream()
460 .filter(fr -> fr.deviceId().equals(deviceId))
461 .iterator()))
wu5f6c5b82017-08-04 16:45:19 +0800462 .forEach(frs -> mapBuilder.put(frs.get(0).deviceId(), frs));
463 this.deviceFlowRules = mapBuilder.build();
464
465 this.leafSwitches = ImmutableSet.copyOf(leafs);
466 this.spineSwitches = ImmutableSet.copyOf(spines);
467
468 // Avoid other executions to modify the generated flow rules.
469 flowRuleGenerated = true;
470
Carmelo Cascone5167f322017-11-21 21:58:50 -0800471 log.info("Generated {} flow rules for {} devices",
472 newFlowRules.size(), spines.size() + leafs.size());
wu5f6c5b82017-08-04 16:45:19 +0800473
474 spawnTask(this::deployAllDevices);
475 }
476
477 /**
478 * Returns a new, pre-configured flow rule builder.
479 *
Carmelo Casconeca94bcf2017-10-27 14:16:59 -0700480 * @param did a device id
481 * @param tableId a table id
wu5f6c5b82017-08-04 16:45:19 +0800482 * @return a new flow rule builder
483 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800484 protected FlowRule.Builder flowRuleBuilder(DeviceId did, PiTableId tableId)
485 throws FlowRuleGeneratorException {
wu5f6c5b82017-08-04 16:45:19 +0800486
Carmelo Cascone1fb27d32017-08-25 20:40:20 +0200487 final Device device = deviceService.getDevice(did);
488 if (!device.is(PiPipelineInterpreter.class)) {
Carmelo Cascone5167f322017-11-21 21:58:50 -0800489 throw new FlowRuleGeneratorException(format(
490 "Device %s has no PiPipelineInterpreter", did));
wu5f6c5b82017-08-04 16:45:19 +0800491 }
wu5f6c5b82017-08-04 16:45:19 +0800492
493 return DefaultFlowRule.builder()
494 .forDevice(did)
Carmelo Cascone03f343d2017-11-13 16:54:39 -0800495 .forTable(tableId)
wu5f6c5b82017-08-04 16:45:19 +0800496 .fromApp(appId)
497 .withPriority(FLOW_PRIORITY)
498 .makePermanent();
499 }
500
501 private List<Port> getHostPorts(DeviceId deviceId, Topology topology) {
502 // Get all non-fabric ports.
503 return deviceService
504 .getPorts(deviceId)
505 .stream()
506 .filter(p -> !isFabricPort(p, topology))
507 .collect(Collectors.toList());
508 }
509
510 private boolean isSpine(DeviceId deviceId, Topology topology) {
511 // True if all ports are fabric.
512 return getHostPorts(deviceId, topology).size() == 0;
513 }
514
515 protected boolean isFabricPort(Port port, Topology topology) {
516 // True if the port connects this device to another infrastructure device.
Carmelo Cascone5167f322017-11-21 21:58:50 -0800517 return topologyService.isInfrastructure(
518 topology, new ConnectPoint(port.element().id(), port.number()));
wu5f6c5b82017-08-04 16:45:19 +0800519 }
520
521 /**
Carmelo Cascone5167f322017-11-21 21:58:50 -0800522 * A listener of device events that executes a device deploy task each time
523 * a device is added, updated or re-connects.
wu5f6c5b82017-08-04 16:45:19 +0800524 */
525 private class InternalDeviceListener implements DeviceListener {
526 @Override
527 public void event(DeviceEvent event) {
528 spawnTask(() -> deployDevice(event.subject()));
529 }
530
531 @Override
532 public boolean isRelevant(DeviceEvent event) {
533 return !appFreezed &&
534 (event.type() == DEVICE_ADDED ||
535 event.type() == DEVICE_UPDATED ||
536 (event.type() == DEVICE_AVAILABILITY_CHANGED &&
537 deviceService.isAvailable(event.subject().id())));
538 }
539 }
540
541 /**
wu5f6c5b82017-08-04 16:45:19 +0800542 * An exception occurred while generating flow rules for this fabric.
543 */
Carmelo Cascone5167f322017-11-21 21:58:50 -0800544 public static class FlowRuleGeneratorException extends Exception {
wu5f6c5b82017-08-04 16:45:19 +0800545
546 public FlowRuleGeneratorException() {
547 }
548
Carmelo Cascone5167f322017-11-21 21:58:50 -0800549 FlowRuleGeneratorException(String msg) {
wu5f6c5b82017-08-04 16:45:19 +0800550 super(msg);
551 }
wu5f6c5b82017-08-04 16:45:19 +0800552 }
Carmelo Cascone87892e22017-11-13 16:01:29 -0800553}