blob: 95e264d4280f7874233ea352b87c2f691fa5a50b [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Brian O'Connor0947d7e2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010021import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070022import com.google.common.collect.Sets;
23import org.onlab.packet.Ethernet;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pier Luigi580fd8a2018-01-16 10:47:50 +010029import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Ray Milkeyae0068a2017-08-15 11:02:29 -070032import org.onosproject.net.config.basics.McastConfig;
Charles Chanc91c8782016-03-30 17:54:24 -070033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.Link;
36import org.onosproject.net.Path;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.flow.criteria.Criteria;
43import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
44import org.onosproject.net.flowobjective.DefaultFilteringObjective;
45import org.onosproject.net.flowobjective.DefaultForwardingObjective;
46import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070051import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070052import org.onosproject.net.mcast.McastEvent;
Pier Luigi35dab3f2018-01-25 16:16:02 +010053import org.onosproject.net.mcast.McastRoute;
Charles Chanc91c8782016-03-30 17:54:24 -070054import org.onosproject.net.mcast.McastRouteInfo;
Pier Luigid8a15162018-02-15 16:33:08 +010055import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.net.topology.TopologyService;
Pier Luigi51ee7c02018-02-23 19:57:40 +010057import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Charles Chan370a65b2016-05-10 17:29:47 -070058import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan72779502016-04-23 17:36:10 -070059import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.onosproject.store.serializers.KryoNamespaces;
61import org.onosproject.store.service.ConsistentMap;
62import org.onosproject.store.service.Serializer;
63import org.onosproject.store.service.StorageService;
Pier Luigi580fd8a2018-01-16 10:47:50 +010064import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070065import org.slf4j.Logger;
66import org.slf4j.LoggerFactory;
67
Pier Luigi35dab3f2018-01-25 16:16:02 +010068import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070069import java.util.Collection;
70import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010071import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070072import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070073import java.util.Map;
Charles Chanc91c8782016-03-30 17:54:24 -070074import java.util.Optional;
75import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010076import java.util.concurrent.ScheduledExecutorService;
77import java.util.concurrent.TimeUnit;
78import java.util.concurrent.locks.Lock;
79import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070080import java.util.stream.Collectors;
81
82import static com.google.common.base.Preconditions.checkState;
Pier Luigi35dab3f2018-01-25 16:16:02 +010083import static java.util.concurrent.Executors.newScheduledThreadPool;
84import static org.onlab.util.Tools.groupedThreads;
Charles Chan10b0fb72017-02-02 16:20:42 -080085import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -070086
87/**
Charles Chan1eaf4802016-04-18 13:44:03 -070088 * Handles multicast-related events.
Charles Chanc91c8782016-03-30 17:54:24 -070089 */
Charles Chan1eaf4802016-04-18 13:44:03 -070090public class McastHandler {
91 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070092 private final SegmentRoutingManager srManager;
93 private final ApplicationId coreAppId;
Charles Chan82f19972016-05-17 13:13:55 -070094 private final StorageService storageService;
95 private final TopologyService topologyService;
Charles Chan72779502016-04-23 17:36:10 -070096 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
97 private final KryoNamespace.Builder mcastKryo;
98 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
99
Pier Luigi35dab3f2018-01-25 16:16:02 +0100100 // Mcast lock to serialize local operations
101 private final Lock mcastLock = new ReentrantLock();
102
103 /**
104 * Acquires the lock used when making mcast changes.
105 */
106 private void mcastLock() {
107 mcastLock.lock();
108 }
109
110 /**
111 * Releases the lock used when making mcast changes.
112 */
113 private void mcastUnlock() {
114 mcastLock.unlock();
115 }
116
117 // Stability threshold for Mcast. Seconds
118 private static final long MCAST_STABLITY_THRESHOLD = 5;
119 // Last change done
120 private Instant lastMcastChange = Instant.now();
121
122 /**
123 * Determines if mcast in the network has been stable in the last
124 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
125 * to the last mcast change timestamp.
126 *
127 * @return true if stable
128 */
129 private boolean isMcastStable() {
130 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
131 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
132 log.debug("Mcast stable since {}s", now - last);
133 return (now - last) > MCAST_STABLITY_THRESHOLD;
134 }
135
136 // Verify interval for Mcast
137 private static final long MCAST_VERIFY_INTERVAL = 30;
138
139 // Executor for mcast bucket corrector
140 private ScheduledExecutorService executorService
141 = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
142
Charles Chan72779502016-04-23 17:36:10 -0700143 /**
144 * Role in the multicast tree.
145 */
146 public enum McastRole {
147 /**
148 * The device is the ingress device of this group.
149 */
150 INGRESS,
151 /**
152 * The device is the transit device of this group.
153 */
154 TRANSIT,
155 /**
156 * The device is the egress device of this group.
157 */
158 EGRESS
159 }
Charles Chanc91c8782016-03-30 17:54:24 -0700160
161 /**
162 * Constructs the McastEventHandler.
163 *
164 * @param srManager Segment Routing manager
165 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700166 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -0700167 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700168 this.srManager = srManager;
169 this.storageService = srManager.storageService;
170 this.topologyService = srManager.topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700171 mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700172 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700173 .register(McastStoreKey.class)
174 .register(McastRole.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700175 mcastNextObjStore = storageService
Charles Chan72779502016-04-23 17:36:10 -0700176 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700177 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700178 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700179 .build();
Charles Chan72779502016-04-23 17:36:10 -0700180 mcastRoleStore = storageService
181 .<McastStoreKey, McastRole>consistentMapBuilder()
182 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700183 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700184 .build();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100185 // Init the executor service and the buckets corrector
186 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
187 MCAST_VERIFY_INTERVAL,
188 TimeUnit.SECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700189 }
190
191 /**
192 * Read initial multicast from mcast store.
193 */
Charles Chan82f19972016-05-17 13:13:55 -0700194 protected void init() {
Charles Chan72779502016-04-23 17:36:10 -0700195 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
196 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
197 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
198 sinks.forEach(sink -> {
199 processSinkAddedInternal(source, sink, mcastRoute.group());
200 });
201 });
Charles Chanc91c8782016-03-30 17:54:24 -0700202 }
203
204 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100205 * Clean up when deactivating the application.
206 */
207 protected void terminate() {
208 executorService.shutdown();
209 }
210
211 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700212 * Processes the SOURCE_ADDED event.
213 *
214 * @param event McastEvent with SOURCE_ADDED type
215 */
216 protected void processSourceAdded(McastEvent event) {
217 log.info("processSourceAdded {}", event);
218 McastRouteInfo mcastRouteInfo = event.subject();
219 if (!mcastRouteInfo.isComplete()) {
220 log.info("Incompleted McastRouteInfo. Abort.");
221 return;
222 }
223 ConnectPoint source = mcastRouteInfo.source().orElse(null);
224 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
225 IpAddress mcastIp = mcastRouteInfo.route().group();
226
Pier Luigi35dab3f2018-01-25 16:16:02 +0100227 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
Charles Chanc91c8782016-03-30 17:54:24 -0700228 }
229
230 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100231 * Processes the SOURCE_UPDATED event.
232 *
233 * @param event McastEvent with SOURCE_UPDATED type
234 */
235 protected void processSourceUpdated(McastEvent event) {
236 log.info("processSourceUpdated {}", event);
237 // Get old and new data
238 McastRouteInfo mcastRouteInfo = event.subject();
239 ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
240 mcastRouteInfo = event.prevSubject();
241 ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
242 // and group ip
243 IpAddress mcastIp = mcastRouteInfo.route().group();
244 // Process the update event
245 processSourceUpdatedInternal(mcastIp, newSource, oldSource);
246 }
247
248 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700249 * Processes the SINK_ADDED event.
250 *
251 * @param event McastEvent with SINK_ADDED type
252 */
253 protected void processSinkAdded(McastEvent event) {
254 log.info("processSinkAdded {}", event);
255 McastRouteInfo mcastRouteInfo = event.subject();
256 if (!mcastRouteInfo.isComplete()) {
257 log.info("Incompleted McastRouteInfo. Abort.");
258 return;
259 }
260 ConnectPoint source = mcastRouteInfo.source().orElse(null);
261 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
262 IpAddress mcastIp = mcastRouteInfo.route().group();
263
264 processSinkAddedInternal(source, sink, mcastIp);
265 }
266
267 /**
268 * Processes the SINK_REMOVED event.
269 *
270 * @param event McastEvent with SINK_REMOVED type
271 */
272 protected void processSinkRemoved(McastEvent event) {
273 log.info("processSinkRemoved {}", event);
274 McastRouteInfo mcastRouteInfo = event.subject();
275 if (!mcastRouteInfo.isComplete()) {
276 log.info("Incompleted McastRouteInfo. Abort.");
277 return;
278 }
279 ConnectPoint source = mcastRouteInfo.source().orElse(null);
280 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
281 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chanc91c8782016-03-30 17:54:24 -0700282
Pier Luigi35dab3f2018-01-25 16:16:02 +0100283 processSinkRemovedInternal(source, sink, mcastIp);
284 }
Charles Chan0932eca2016-06-28 16:50:13 -0700285
Pier Luigi35dab3f2018-01-25 16:16:02 +0100286 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100287 * Processes the ROUTE_REMOVED event.
288 *
289 * @param event McastEvent with ROUTE_REMOVED type
290 */
291 protected void processRouteRemoved(McastEvent event) {
292 log.info("processRouteRemoved {}", event);
293 McastRouteInfo mcastRouteInfo = event.subject();
294 if (!mcastRouteInfo.source().isPresent()) {
295 log.info("Incompleted McastRouteInfo. Abort.");
296 return;
297 }
298 // Get group ip and ingress connect point
299 IpAddress mcastIp = mcastRouteInfo.route().group();
Pier Luigie80d6b42018-02-26 12:31:38 +0100300 ConnectPoint source = mcastRouteInfo.source().orElse(null);
Pier Luigi6786b922018-02-02 16:19:11 +0100301
302 processRouteRemovedInternal(source, mcastIp);
303 }
304
305 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100306 * Process the SOURCE_UPDATED event.
307 *
308 * @param newSource the updated srouce info
309 * @param oldSource the outdated source info
310 */
311 private void processSourceUpdatedInternal(IpAddress mcastIp,
312 ConnectPoint newSource,
313 ConnectPoint oldSource) {
314 lastMcastChange = Instant.now();
315 mcastLock();
316 try {
317 log.debug("Processing source updated for group {}", mcastIp);
318
319 // Build key for the store and retrieve old data
320 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
321
322 // Verify leadership on the operation
323 if (!isLeader(oldSource)) {
324 log.debug("Skip {} due to lack of leadership", mcastIp);
325 return;
326 }
327
328 // This device is not serving this multicast group
329 if (!mcastRoleStore.containsKey(mcastStoreKey) ||
330 !mcastNextObjStore.containsKey(mcastStoreKey)) {
331 log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
332 return;
333 }
334 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
335 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
336
337 // Let's remove old flows and groups
338 removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
339 // Push new flows and group
340 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
341 mcastIp, assignedVlan(newSource)));
342 addFilterToDevice(newSource.deviceId(), newSource.port(),
343 assignedVlan(newSource), mcastIp);
344 // Setup mcast roles
345 mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
346 McastRole.INGRESS);
347 } finally {
348 mcastUnlock();
349 }
350 }
351
352 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100353 * Removes the entire mcast tree related to this group.
354 *
355 * @param mcastIp multicast group IP address
356 */
357 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
358 lastMcastChange = Instant.now();
359 mcastLock();
360 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100361 log.debug("Processing route removed for group {}", mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100362
363 // Find out the ingress, transit and egress device of the affected group
364 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
365 .stream().findAny().orElse(null);
366 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
367 .stream().findAny().orElse(null);
368 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
369
370 // Verify leadership on the operation
371 if (!isLeader(source)) {
372 log.debug("Skip {} due to lack of leadership", mcastIp);
373 return;
374 }
375
376 // If there are egress devices, sinks could be only on the ingress
377 if (!egressDevices.isEmpty()) {
378 egressDevices.forEach(
379 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
380 );
381 }
382 // Transit could be null
383 if (transitDevice != null) {
384 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
385 }
386 // Ingress device should be not null
387 if (ingressDevice != null) {
388 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
389 }
Pier Luigi6786b922018-02-02 16:19:11 +0100390 } finally {
391 mcastUnlock();
392 }
393 }
394
395 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100396 * Removes a path from source to sink for given multicast group.
397 *
398 * @param source connect point of the multicast source
399 * @param sink connection point of the multicast sink
400 * @param mcastIp multicast group IP address
401 */
402 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
403 IpAddress mcastIp) {
404 lastMcastChange = Instant.now();
405 mcastLock();
406 try {
Pier Luigi6786b922018-02-02 16:19:11 +0100407 // Verify leadership on the operation
408 if (!isLeader(source)) {
409 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -0700410 return;
411 }
Charles Chanc91c8782016-03-30 17:54:24 -0700412
Pier Luigi35dab3f2018-01-25 16:16:02 +0100413 // When source and sink are on the same device
414 if (source.deviceId().equals(sink.deviceId())) {
415 // Source and sink are on even the same port. There must be something wrong.
416 if (source.port().equals(sink.port())) {
417 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
418 mcastIp, sink, source);
419 return;
420 }
421 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
422 return;
423 }
Charles Chanc91c8782016-03-30 17:54:24 -0700424
Pier Luigi35dab3f2018-01-25 16:16:02 +0100425 // Process the egress device
426 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
427 if (isLast) {
428 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
429 }
430
431 // If this is the last sink on the device, also update upstream
432 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
433 if (mcastPath.isPresent()) {
434 List<Link> links = Lists.newArrayList(mcastPath.get().links());
435 Collections.reverse(links);
436 for (Link link : links) {
437 if (isLast) {
438 isLast = removePortFromDevice(
439 link.src().deviceId(),
440 link.src().port(),
441 mcastIp,
442 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
443 );
444 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
445 }
Charles Chanc91c8782016-03-30 17:54:24 -0700446 }
447 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100448 } finally {
449 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700450 }
451 }
452
453 /**
454 * Establishes a path from source to sink for given multicast group.
455 *
456 * @param source connect point of the multicast source
457 * @param sink connection point of the multicast sink
458 * @param mcastIp multicast group IP address
459 */
460 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
461 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100462 lastMcastChange = Instant.now();
463 mcastLock();
464 try {
465 // Continue only when this instance is the master of source device
466 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
467 log.debug("Skip {} due to lack of mastership of the source device {}",
468 mcastIp, source.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700469 return;
470 }
Charles Chanc91c8782016-03-30 17:54:24 -0700471
Pier Luigi35dab3f2018-01-25 16:16:02 +0100472 // Process the ingress device
473 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700474
Pier Luigi35dab3f2018-01-25 16:16:02 +0100475 // When source and sink are on the same device
476 if (source.deviceId().equals(sink.deviceId())) {
477 // Source and sink are on even the same port. There must be something wrong.
478 if (source.port().equals(sink.port())) {
479 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
480 mcastIp, sink, source);
481 return;
482 }
483 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
484 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
485 return;
486 }
Charles Chan72779502016-04-23 17:36:10 -0700487
Pier Luigi35dab3f2018-01-25 16:16:02 +0100488 // Find a path. If present, create/update groups and flows for each hop
489 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
490 if (mcastPath.isPresent()) {
491 List<Link> links = mcastPath.get().links();
492 checkState(links.size() == 2,
493 "Path in leaf-spine topology should always be two hops: ", links);
Charles Chan72779502016-04-23 17:36:10 -0700494
Pier Luigi35dab3f2018-01-25 16:16:02 +0100495 links.forEach(link -> {
496 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
497 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
498 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
499 });
500
501 // Process the egress device
502 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
503
504 // Setup mcast roles
505 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
506 McastRole.INGRESS);
507 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
508 McastRole.TRANSIT);
509 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
510 McastRole.EGRESS);
511 } else {
512 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
513 source.deviceId(), sink.deviceId());
514 }
515 } finally {
516 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700517 }
518 }
519
520 /**
Charles Chan72779502016-04-23 17:36:10 -0700521 * Processes the LINK_DOWN event.
522 *
523 * @param affectedLink Link that is going down
524 */
525 protected void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100526 lastMcastChange = Instant.now();
527 mcastLock();
528 try {
529 // Get groups affected by the link down event
530 getAffectedGroups(affectedLink).forEach(mcastIp -> {
531 // TODO Optimize when the group editing is in place
532 log.debug("Processing link down {} for group {}",
533 affectedLink, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100534
Pier Luigi35dab3f2018-01-25 16:16:02 +0100535 // Find out the ingress, transit and egress device of affected group
536 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
537 .stream().findAny().orElse(null);
538 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
539 .stream().findAny().orElse(null);
540 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
541 ConnectPoint source = getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700542
Pier Luigi35dab3f2018-01-25 16:16:02 +0100543 // Do not proceed if any of these info is missing
544 if (ingressDevice == null || transitDevice == null
545 || egressDevices == null || source == null) {
546 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
547 ingressDevice, transitDevice, egressDevices, source);
548 return;
Charles Chan72779502016-04-23 17:36:10 -0700549 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100550
551 // Continue only when this instance is the master of source device
552 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
553 log.debug("Skip {} due to lack of mastership of the source device {}",
554 source.deviceId());
555 return;
556 }
557
558 // Remove entire transit
559 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
560
561 // Remove transit-facing port on ingress device
562 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
563 if (ingressTransitPort != null) {
564 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
565 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
566 }
567
568 // Construct a new path for each egress device
569 egressDevices.forEach(egressDevice -> {
570 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
571 if (mcastPath.isPresent()) {
572 installPath(mcastIp, source, mcastPath.get());
573 } else {
574 log.warn("Fail to recover egress device {} from link failure {}",
575 egressDevice, affectedLink);
576 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
577 }
578 });
Charles Chan72779502016-04-23 17:36:10 -0700579 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100580 } finally {
581 mcastUnlock();
582 }
Charles Chan72779502016-04-23 17:36:10 -0700583 }
584
585 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100586 * Process the DEVICE_DOWN event.
587 *
588 * @param deviceDown device going down
589 */
590 protected void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100591 lastMcastChange = Instant.now();
592 mcastLock();
593 try {
594 // Get the mcast groups affected by the device going down
595 getAffectedGroups(deviceDown).forEach(mcastIp -> {
596 // TODO Optimize when the group editing is in place
597 log.debug("Processing device down {} for group {}",
598 deviceDown, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100599
Pier Luigi35dab3f2018-01-25 16:16:02 +0100600 // Find out the ingress, transit and egress device of affected group
601 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
602 .stream().findAny().orElse(null);
603 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
604 .stream().findAny().orElse(null);
605 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
606 ConnectPoint source = getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100607
Pier Luigi35dab3f2018-01-25 16:16:02 +0100608 // Do not proceed if ingress device or source of this group are missing
609 // If sinks are in other leafs, we have ingress, transit, egress, and source
610 // If sinks are in the same leaf, we have just ingress and source
611 if (ingressDevice == null || source == null) {
612 log.warn("Missing ingress {} or source {} for group {}",
613 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100614 return;
615 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100616
Pier Luigi6786b922018-02-02 16:19:11 +0100617 // Verify leadership on the operation
618 if (!isLeader(source)) {
619 log.debug("Skip {} due to lack of leadership", mcastIp);
620 return;
Pier Luigi580fd8a2018-01-16 10:47:50 +0100621 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100622
623 // If it exists, we have to remove it in any case
624 if (transitDevice != null) {
625 // Remove entire transit
626 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
627 }
628 // If the ingress is down
629 if (ingressDevice.equals(deviceDown)) {
630 // Remove entire ingress
631 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
632 // If other sinks different from the ingress exist
633 if (!egressDevices.isEmpty()) {
634 // Remove all the remaining egress
635 egressDevices.forEach(
636 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
637 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100638 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100639 } else {
640 // Egress or transit could be down at this point
641 // Get the ingress-transit port if it exists
642 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
643 if (ingressTransitPort != null) {
644 // Remove transit-facing port on ingress device
645 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
646 }
647 // One of the egress device is down
648 if (egressDevices.contains(deviceDown)) {
649 // Remove entire device down
650 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
651 // Remove the device down from egress
652 egressDevices.remove(deviceDown);
653 // If there are no more egress and ingress does not have sinks
654 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
655 // Remove entire ingress
656 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
657 // We have done
658 return;
659 }
660 }
661 // Construct a new path for each egress device
662 egressDevices.forEach(egressDevice -> {
663 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
664 // If there is a new path
665 if (mcastPath.isPresent()) {
666 // Let's install the new mcast path for this egress
667 installPath(mcastIp, source, mcastPath.get());
668 } else {
669 // We were not able to find an alternative path for this egress
670 log.warn("Fail to recover egress device {} from device down {}",
671 egressDevice, deviceDown);
672 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
673 }
674 });
675 }
676 });
677 } finally {
678 mcastUnlock();
679 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100680 }
681
682 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700683 * Adds filtering objective for given device and port.
684 *
685 * @param deviceId device ID
686 * @param port ingress port number
687 * @param assignedVlan assigned VLAN ID
688 */
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000689 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700690 // Do nothing if the port is configured as suppressed
Charles Chan370a65b2016-05-10 17:29:47 -0700691 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
692 SegmentRoutingAppConfig appConfig = srManager.cfgService
693 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
694 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
695 log.info("Ignore suppressed port {}", connectPoint);
Charles Chanc91c8782016-03-30 17:54:24 -0700696 return;
697 }
698
699 FilteringObjective.Builder filtObjBuilder =
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000700 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700701 ObjectiveContext context = new DefaultObjectiveContext(
702 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800703 deviceId, port.toLong(), assignedVlan),
Charles Chan72779502016-04-23 17:36:10 -0700704 (objective, error) ->
705 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800706 deviceId, port.toLong(), assignedVlan, error));
Charles Chan72779502016-04-23 17:36:10 -0700707 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chanc91c8782016-03-30 17:54:24 -0700708 }
709
710 /**
711 * Adds a port to given multicast group on given device. This involves the
712 * update of L3 multicast group and multicast routing table entry.
713 *
714 * @param deviceId device ID
715 * @param port port to be added
716 * @param mcastIp multicast group
717 * @param assignedVlan assigned VLAN ID
718 */
719 private void addPortToDevice(DeviceId deviceId, PortNumber port,
720 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700721 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700722 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +0100723 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -0700724 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700725 // First time someone request this mcast group via this device
726 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100727 // New nextObj
728 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
729 portBuilder.build(), null).add();
730 // Store the new port
731 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700732 } else {
733 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -0700734 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700735 // Stop if the port is already in the nextobj
736 Set<PortNumber> existingPorts = getPorts(nextObj.next());
737 if (existingPorts.contains(port)) {
738 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
739 return;
740 }
Pier Luigi4f0dd212018-01-19 10:24:53 +0100741 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -0800742 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100743 // Reuse previous nextObj
744 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
745 portBuilder.build(), nextObj.id()).addToExisting();
746 // Store the final next objective and send only the difference to the driver
747 mcastNextObjStore.put(mcastStoreKey, newNextObj);
748 // Add just the new port
749 portBuilder = ImmutableSet.builder();
750 portBuilder.add(port);
751 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
752 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -0700753 }
754 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -0700755 ObjectiveContext context = new DefaultObjectiveContext(
756 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
757 mcastIp, deviceId, port.toLong(), assignedVlan),
758 (objective, error) ->
759 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
760 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700761 ForwardingObjective fwdObj =
Charles Chan72779502016-04-23 17:36:10 -0700762 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -0700763 srManager.flowObjectiveService.next(deviceId, newNextObj);
764 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700765 }
766
767 /**
768 * Removes a port from given multicast group on given device.
769 * This involves the update of L3 multicast group and multicast routing
770 * table entry.
771 *
772 * @param deviceId device ID
773 * @param port port to be added
774 * @param mcastIp multicast group
775 * @param assignedVlan assigned VLAN ID
776 * @return true if this is the last sink on this device
777 */
778 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
779 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700780 McastStoreKey mcastStoreKey =
781 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700782 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -0700783 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700784 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
785 return false;
786 }
Charles Chan72779502016-04-23 17:36:10 -0700787 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700788
789 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -0700790 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -0700791 if (!existingPorts.contains(port)) {
792 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
793 return false;
794 }
795 // Copy and modify the ImmutableSet
796 existingPorts = Sets.newHashSet(existingPorts);
797 existingPorts.remove(port);
798
799 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +0100800 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -0700801 ForwardingObjective fwdObj;
802 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +0100803 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -0700804 // NOTE: Rely on GroupStore garbage collection rather than explicitly
805 // remove L3MG since there might be other flows/groups refer to
806 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +0100807 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700808 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
809 mcastIp, deviceId, port.toLong(), assignedVlan),
810 (objective, error) ->
811 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
812 mcastIp, deviceId, port.toLong(), assignedVlan, error));
813 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
814 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -0700815 } else {
816 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +0100817 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700818 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
819 mcastIp, deviceId, port.toLong(), assignedVlan),
820 (objective, error) ->
821 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
822 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +0100823 // Here we store the next objective with the remaining port
824 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
825 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan82f19972016-05-17 13:13:55 -0700826 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -0700827 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700828 }
Pier Luigi8cd46de2018-01-19 10:24:53 +0100829 // Let's modify the next objective removing the bucket
830 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
831 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
832 srManager.flowObjectiveService.next(deviceId, newNextObj);
833 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700834 return existingPorts.isEmpty();
835 }
836
Charles Chan72779502016-04-23 17:36:10 -0700837 /**
838 * Removes entire group on given device.
839 *
840 * @param deviceId device ID
841 * @param mcastIp multicast group to be removed
842 * @param assignedVlan assigned VLAN ID
843 */
844 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
845 VlanId assignedVlan) {
846 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
847 // This device is not serving this multicast group
848 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
849 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
850 return;
851 }
852 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
853 // NOTE: Rely on GroupStore garbage collection rather than explicitly
854 // remove L3MG since there might be other flows/groups refer to
855 // the same L2IG
856 ObjectiveContext context = new DefaultObjectiveContext(
857 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
858 mcastIp, deviceId, assignedVlan),
859 (objective, error) ->
860 log.warn("Failed to remove {} on {}, vlan {}: {}",
861 mcastIp, deviceId, assignedVlan, error));
862 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
863 srManager.flowObjectiveService.forward(deviceId, fwdObj);
864 mcastNextObjStore.remove(mcastStoreKey);
865 mcastRoleStore.remove(mcastStoreKey);
866 }
867
Pier Luigi580fd8a2018-01-16 10:47:50 +0100868 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
869 // Get Links
870 List<Link> links = mcastPath.links();
871 // For each link, modify the next on the source device adding the src port
872 // and a new filter objective on the destination port
873 links.forEach(link -> {
874 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
875 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
876 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
877 mcastIp);
878 });
879 // Setup new transit mcast role
880 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
881 McastRole.TRANSIT);
Charles Chan72779502016-04-23 17:36:10 -0700882 }
883
Charles Chanc91c8782016-03-30 17:54:24 -0700884 /**
885 * Creates a next objective builder for multicast.
886 *
887 * @param mcastIp multicast group
888 * @param assignedVlan assigned VLAN ID
889 * @param outPorts set of output port numbers
890 * @return next objective builder
891 */
892 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi4f0dd212018-01-19 10:24:53 +0100893 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
894 // If nextId is null allocate a new one
895 if (nextId == null) {
896 nextId = srManager.flowObjectiveService.allocateNextId();
897 }
Charles Chanc91c8782016-03-30 17:54:24 -0700898
899 TrafficSelector metadata =
900 DefaultTrafficSelector.builder()
901 .matchVlanId(assignedVlan)
902 .matchIPDst(mcastIp.toIpPrefix())
903 .build();
904
905 NextObjective.Builder nextObjBuilder = DefaultNextObjective
906 .builder().withId(nextId)
907 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
908 .withMeta(metadata);
909
910 outPorts.forEach(port -> {
911 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
912 if (egressVlan().equals(VlanId.NONE)) {
913 tBuilder.popVlan();
914 }
915 tBuilder.setOutput(port);
916 nextObjBuilder.addTreatment(tBuilder.build());
917 });
918
919 return nextObjBuilder;
920 }
921
922 /**
923 * Creates a forwarding objective builder for multicast.
924 *
925 * @param mcastIp multicast group
926 * @param assignedVlan assigned VLAN ID
927 * @param nextId next ID of the L3 multicast group
928 * @return forwarding objective builder
929 */
930 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
931 VlanId assignedVlan, int nextId) {
932 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000933 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
934
935 if (mcastIp.isIp4()) {
936 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
937 sbuilder.matchIPDst(mcastPrefix);
938 } else {
939 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
940 sbuilder.matchIPv6Dst(mcastPrefix);
941 }
942
943
Charles Chanc91c8782016-03-30 17:54:24 -0700944 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
945 metabuilder.matchVlanId(assignedVlan);
946
947 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
948 fwdBuilder.withSelector(sbuilder.build())
949 .withMeta(metabuilder.build())
950 .nextStep(nextId)
951 .withFlag(ForwardingObjective.Flag.SPECIFIC)
952 .fromApp(srManager.appId)
953 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
954 return fwdBuilder;
955 }
956
957 /**
958 * Creates a filtering objective builder for multicast.
959 *
960 * @param deviceId Device ID
961 * @param ingressPort ingress port of the multicast stream
962 * @param assignedVlan assigned VLAN ID
963 * @return filtering objective builder
964 */
965 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000966 VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700967 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Charles Chan0932eca2016-06-28 16:50:13 -0700968
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000969 if (mcastIp.isIp4()) {
970 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
971 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
972 MacAddress.IPV4_MULTICAST_MASK))
973 .addCondition(Criteria.matchVlanId(egressVlan()))
974 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
975 } else {
976 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
977 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
978 MacAddress.IPV6_MULTICAST_MASK))
979 .addCondition(Criteria.matchVlanId(egressVlan()))
980 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
981 }
Charles Chan0932eca2016-06-28 16:50:13 -0700982 TrafficTreatment tt = DefaultTrafficTreatment.builder()
983 .pushVlan().setVlanId(assignedVlan).build();
984 filtBuilder.withMeta(tt);
985
Charles Chanc91c8782016-03-30 17:54:24 -0700986 return filtBuilder.permit().fromApp(srManager.appId);
987 }
988
989 /**
990 * Gets output ports information from treatments.
991 *
992 * @param treatments collection of traffic treatments
993 * @return set of output port numbers
994 */
995 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
996 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
997 treatments.forEach(treatment -> {
998 treatment.allInstructions().stream()
999 .filter(instr -> instr instanceof OutputInstruction)
1000 .forEach(instr -> {
1001 builder.add(((OutputInstruction) instr).port());
1002 });
1003 });
1004 return builder.build();
1005 }
1006
Pier Luigi51ee7c02018-02-23 19:57:40 +01001007 // Utility method to verify is a link is a pair-link
1008 private boolean isPairLink(Link link) {
1009 // Take src id, src port, dst id and dst port
1010 final DeviceId srcId = link.src().deviceId();
1011 final PortNumber srcPort = link.src().port();
1012 final DeviceId dstId = link.dst().deviceId();
1013 final PortNumber dstPort = link.dst().port();
1014 // init as true
1015 boolean isPairLink = true;
1016 try {
1017 // If one of this condition is not true; it is not a pair link
1018 if (!(srManager.deviceConfiguration.isEdgeDevice(srcId) &&
1019 srManager.deviceConfiguration.isEdgeDevice(dstId) &&
1020 srManager.deviceConfiguration.getPairDeviceId(srcId).equals(dstId) &&
1021 srManager.deviceConfiguration.getPairLocalPort(srcId).equals(srcPort) &&
1022 srManager.deviceConfiguration.getPairLocalPort(dstId).equals(dstPort))) {
1023 isPairLink = false;
1024 }
1025 } catch (DeviceConfigNotFoundException e) {
1026 // Configuration not provided
1027 log.warn("Could not check if the link {} is pairlink "
1028 + "config not yet provided", link);
1029 isPairLink = false;
1030 }
1031 return isPairLink;
1032 }
1033
Charles Chanc91c8782016-03-30 17:54:24 -07001034 /**
1035 * Gets a path from src to dst.
1036 * If a path was allocated before, returns the allocated path.
1037 * Otherwise, randomly pick one from available paths.
1038 *
1039 * @param src source device ID
1040 * @param dst destination device ID
1041 * @param mcastIp multicast group
1042 * @return an optional path from src to dst
1043 */
1044 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
Pier Luigid8a15162018-02-15 16:33:08 +01001045 // Takes a snapshot of the topology
1046 final Topology currentTopology = topologyService.currentTopology();
Charles Chanc91c8782016-03-30 17:54:24 -07001047 List<Path> allPaths = Lists.newArrayList(
Pier Luigid8a15162018-02-15 16:33:08 +01001048 topologyService.getPaths(currentTopology, src, dst)
1049 );
Pier Luigi51ee7c02018-02-23 19:57:40 +01001050 // Create list of valid paths
1051 allPaths.removeIf(path -> path.links().stream().anyMatch(this::isPairLink));
1052 // If there are no valid paths, just exit
Charles Chan72779502016-04-23 17:36:10 -07001053 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chanc91c8782016-03-30 17:54:24 -07001054 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001055 return Optional.empty();
1056 }
1057
Pier Luigi91573e12018-01-23 16:06:38 +01001058 // Create a map index of suitablity-to-list of paths. For example
1059 // a path in the list associated to the index 1 shares only the
1060 // first hop and it is less suitable of a path belonging to the index
1061 // 2 that shares leaf-spine.
1062 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1063 // Some init steps
1064 int nhop;
1065 McastStoreKey mcastStoreKey;
1066 Link hop;
1067 PortNumber srcPort;
1068 Set<PortNumber> existingPorts;
1069 NextObjective nextObj;
1070 // Iterate over paths looking for eligible paths
1071 for (Path path : allPaths) {
1072 // Unlikely, it will happen...
1073 if (!src.equals(path.links().get(0).src().deviceId())) {
1074 continue;
1075 }
1076 nhop = 0;
1077 // Iterate over the links
1078 while (nhop < path.links().size()) {
1079 // Get the link and verify if a next related
1080 // to the src device exist in the store
1081 hop = path.links().get(nhop);
1082 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1083 // It does not exist in the store, exit
1084 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1085 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001086 }
Pier Luigi91573e12018-01-23 16:06:38 +01001087 // Get the output ports on the next
1088 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1089 existingPorts = getPorts(nextObj.next());
1090 // And the src port on the link
1091 srcPort = hop.src().port();
1092 // the src port is not used as output, exit
1093 if (!existingPorts.contains(srcPort)) {
1094 break;
1095 }
1096 nhop++;
1097 }
1098 // n_hop defines the index
1099 if (nhop > 0) {
1100 eligiblePaths.compute(nhop, (index, paths) -> {
1101 paths = paths == null ? Lists.newArrayList() : paths;
1102 paths.add(path);
1103 return paths;
1104 });
Charles Chanc91c8782016-03-30 17:54:24 -07001105 }
1106 }
Pier Luigi91573e12018-01-23 16:06:38 +01001107
1108 // No suitable paths
1109 if (eligiblePaths.isEmpty()) {
1110 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1111 // Otherwise, randomly pick a path
1112 Collections.shuffle(allPaths);
1113 return allPaths.stream().findFirst();
1114 }
1115
1116 // Let's take the best ones
1117 Integer bestIndex = eligiblePaths.keySet()
1118 .stream()
1119 .sorted(Comparator.reverseOrder())
1120 .findFirst().orElse(null);
1121 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1122 log.debug("{} eligiblePath(s) found from {} to {}",
1123 bestPaths.size(), src, dst);
1124 // randomly pick a path on the highest index
1125 Collections.shuffle(bestPaths);
1126 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001127 }
1128
1129 /**
Charles Chan72779502016-04-23 17:36:10 -07001130 * Gets device(s) of given role in given multicast group.
1131 *
1132 * @param mcastIp multicast IP
1133 * @param role multicast role
1134 * @return set of device ID or empty set if not found
1135 */
1136 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1137 return mcastRoleStore.entrySet().stream()
1138 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1139 entry.getValue().value() == role)
1140 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
1141 .collect(Collectors.toSet());
1142 }
1143
1144 /**
Charles Chana8f9dee2016-05-16 18:44:13 -07001145 * Gets source connect point of given multicast group.
1146 *
1147 * @param mcastIp multicast IP
1148 * @return source connect point or null if not found
1149 */
1150 private ConnectPoint getSource(IpAddress mcastIp) {
1151 return srManager.multicastRouteService.getRoutes().stream()
1152 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
1153 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
1154 .findAny().orElse(null);
1155 }
1156
1157 /**
Charles Chan72779502016-04-23 17:36:10 -07001158 * Gets groups which is affected by the link down event.
1159 *
1160 * @param link link going down
1161 * @return a set of multicast IpAddress
1162 */
1163 private Set<IpAddress> getAffectedGroups(Link link) {
1164 DeviceId deviceId = link.src().deviceId();
1165 PortNumber port = link.src().port();
1166 return mcastNextObjStore.entrySet().stream()
1167 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1168 getPorts(entry.getValue().value().next()).contains(port))
1169 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1170 .collect(Collectors.toSet());
1171 }
1172
1173 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001174 * Gets groups which are affected by the device down event.
1175 *
1176 * @param deviceId device going down
1177 * @return a set of multicast IpAddress
1178 */
1179 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1180 return mcastNextObjStore.entrySet().stream()
1181 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
1182 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1183 .collect(Collectors.toSet());
1184 }
1185
1186 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001187 * Gets egress VLAN from McastConfig.
1188 *
1189 * @return egress VLAN or VlanId.NONE if not configured
1190 */
1191 private VlanId egressVlan() {
1192 McastConfig mcastConfig =
1193 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1194 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1195 }
1196
1197 /**
1198 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chana8f9dee2016-05-16 18:44:13 -07001199 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chanc91c8782016-03-30 17:54:24 -07001200 *
Charles Chana8f9dee2016-05-16 18:44:13 -07001201 * @param cp connect point; Can be null if not specified
1202 * @return assigned VLAN ID
Charles Chanc91c8782016-03-30 17:54:24 -07001203 */
Charles Chana8f9dee2016-05-16 18:44:13 -07001204 private VlanId assignedVlan(ConnectPoint cp) {
1205 // Use the egressVlan if it is tagged
1206 if (!egressVlan().equals(VlanId.NONE)) {
1207 return egressVlan();
1208 }
1209 // Reuse unicast VLAN if the port has subnet configured
1210 if (cp != null) {
Charles Chanf9759582017-10-20 19:09:16 -07001211 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan10b0fb72017-02-02 16:20:42 -08001212 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chana8f9dee2016-05-16 18:44:13 -07001213 }
Charles Chan10b0fb72017-02-02 16:20:42 -08001214 // Use DEFAULT_VLAN if none of the above matches
1215 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -07001216 }
Charles Chan72779502016-04-23 17:36:10 -07001217
1218 /**
1219 * Gets the spine-facing port on ingress device of given multicast group.
1220 *
1221 * @param mcastIp multicast IP
1222 * @return spine-facing port on ingress device
1223 */
1224 private PortNumber ingressTransitPort(IpAddress mcastIp) {
1225 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1226 .stream().findAny().orElse(null);
1227 if (ingressDevice != null) {
1228 NextObjective nextObj = mcastNextObjStore
1229 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1230 Set<PortNumber> ports = getPorts(nextObj.next());
1231
1232 for (PortNumber port : ports) {
1233 // Spine-facing port should have no subnet and no xconnect
1234 if (srManager.deviceConfiguration != null &&
Pier Ventreb6a7f342016-11-26 21:05:22 -08001235 srManager.deviceConfiguration.getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001236 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Charles Chan72779502016-04-23 17:36:10 -07001237 return port;
1238 }
1239 }
1240 }
1241 return null;
1242 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001243
1244 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001245 * Verify if the given device has sinks
1246 * for the multicast group.
1247 *
1248 * @param deviceId device Id
1249 * @param mcastIp multicast IP
1250 * @return true if the device has sink for the group.
1251 * False otherwise.
1252 */
1253 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1254 if (deviceId != null) {
1255 // Get the nextobjective
1256 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1257 new McastStoreKey(mcastIp, deviceId)
1258 );
1259 // If it exists
1260 if (versionedNextObj != null) {
1261 NextObjective nextObj = versionedNextObj.value();
1262 // Retrieves all the output ports
1263 Set<PortNumber> ports = getPorts(nextObj.next());
1264 // Tries to find at least one port that is not spine-facing
1265 for (PortNumber port : ports) {
1266 // Spine-facing port should have no subnet and no xconnect
1267 if (srManager.deviceConfiguration != null &&
1268 (!srManager.deviceConfiguration.getPortSubnets(deviceId, port).isEmpty() ||
1269 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1270 return true;
1271 }
1272 }
1273 }
1274 }
1275 return false;
1276 }
1277
1278 /**
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001279 * Removes filtering objective for given device and port.
1280 *
1281 * @param deviceId device ID
1282 * @param port ingress port number
1283 * @param assignedVlan assigned VLAN ID
1284 * @param mcastIp multicast IP address
1285 */
1286 private void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
1287 // Do nothing if the port is configured as suppressed
1288 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1289 SegmentRoutingAppConfig appConfig = srManager.cfgService
1290 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
1291 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1292 log.info("Ignore suppressed port {}", connectPoint);
1293 return;
1294 }
1295
1296 FilteringObjective.Builder filtObjBuilder =
1297 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
1298 ObjectiveContext context = new DefaultObjectiveContext(
1299 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1300 deviceId, port.toLong(), assignedVlan),
1301 (objective, error) ->
1302 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1303 deviceId, port.toLong(), assignedVlan, error));
1304 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1305 }
1306
1307 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001308 * Updates filtering objective for given device and port.
1309 * It is called in general when the mcast config has been
1310 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001311 *
1312 * @param deviceId device ID
1313 * @param portNum ingress port number
1314 * @param vlanId assigned VLAN ID
1315 * @param install true to add, false to remove
1316 */
1317 protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1318 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001319 lastMcastChange = Instant.now();
1320 mcastLock();
1321 try {
1322 // Iterates over the route and updates properly the filtering objective
1323 // on the source device.
1324 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1325 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
1326 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1327 if (install) {
1328 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1329 } else {
1330 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1331 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001332 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001333 });
1334 } finally {
1335 mcastUnlock();
1336 }
1337 }
1338
Pier Luigi6786b922018-02-02 16:19:11 +01001339 private boolean isLeader(ConnectPoint source) {
1340 // Continue only when we have the mastership on the operation
1341 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1342 // When the source is available we just check the mastership
1343 if (srManager.deviceService.isAvailable(source.deviceId())) {
1344 return false;
1345 }
1346 // Fallback with Leadership service
1347 // source id is used a topic
1348 NodeId leader = srManager.leadershipService.runForLeadership(
1349 source.deviceId().toString()).leaderNodeId();
1350 // Verify if this node is the leader
1351 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
1352 return false;
1353 }
1354 }
1355 // Done
1356 return true;
1357 }
1358
Pier Luigi35dab3f2018-01-25 16:16:02 +01001359 /**
1360 * Performs bucket verification operation for all mcast groups in the devices.
1361 * Firstly, it verifies that mcast is stable before trying verification operation.
1362 * Verification consists in creating new nexts with VERIFY operation. Actually,
1363 * the operation is totally delegated to the driver.
1364 */
1365 private final class McastBucketCorrector implements Runnable {
1366
1367 @Override
1368 public void run() {
1369 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1370 if (!isMcastStable()) {
1371 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001372 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001373 // Acquires lock
1374 mcastLock();
1375 try {
1376 // Iterates over the routes and verify the related next objectives
1377 srManager.multicastRouteService.getRoutes()
1378 .stream()
1379 .map(McastRoute::group)
1380 .forEach(mcastIp -> {
1381 log.trace("Running mcast buckets corrector for mcast group: {}",
1382 mcastIp);
1383
1384 // For each group we get current information in the store
1385 // and issue a check of the next objectives in place
1386 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1387 .stream().findAny().orElse(null);
1388 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
1389 .stream().findAny().orElse(null);
1390 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
1391 ConnectPoint source = getSource(mcastIp);
1392
1393 // Do not proceed if ingress device or source of this group are missing
1394 if (ingressDevice == null || source == null) {
1395 log.warn("Unable to run buckets corrector. " +
1396 "Missing ingress {} or source {} for group {}",
1397 ingressDevice, source, mcastIp);
1398 return;
1399 }
1400
1401 // Continue only when this instance is the master of source device
1402 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1403 log.trace("Unable to run buckets corrector. " +
1404 "Skip {} due to lack of mastership " +
1405 "of the source device {}",
1406 mcastIp, source.deviceId());
1407 return;
1408 }
1409
1410 // Create the set of the devices to be processed
1411 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1412 devicesBuilder.add(ingressDevice);
1413 if (transitDevice != null) {
1414 devicesBuilder.add(transitDevice);
1415 }
1416 if (!egressDevices.isEmpty()) {
1417 devicesBuilder.addAll(egressDevices);
1418 }
1419 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1420
1421 // Iterate over the devices
1422 devicesToProcess.forEach(deviceId -> {
1423 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1424 // If next exists in our store verify related next objective
1425 if (mcastNextObjStore.containsKey(currentKey)) {
1426 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1427 // Get current ports
1428 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1429 // Rebuild the next objective
1430 currentNext = nextObjBuilder(
1431 mcastIp,
1432 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1433 currentPorts,
1434 currentNext.id()
1435 ).verify();
1436 // Send to the flowobjective service
1437 srManager.flowObjectiveService.next(deviceId, currentNext);
1438 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001439 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001440 "Missing next for {} and group {}",
1441 deviceId, mcastIp);
1442 }
1443 });
1444
1445 });
1446 } finally {
1447 // Finally, it releases the lock
1448 mcastUnlock();
1449 }
1450
1451 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001452 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001453
1454 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1455 // If mcast ip is present
1456 if (mcastIp != null) {
1457 return mcastNextObjStore.entrySet().stream()
1458 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1459 .collect(Collectors.toMap(Map.Entry::getKey,
1460 entry -> entry.getValue().value().id()));
1461 }
1462 // Otherwise take all the groups
1463 return mcastNextObjStore.entrySet().stream()
1464 .collect(Collectors.toMap(Map.Entry::getKey,
1465 entry -> entry.getValue().value().id()));
1466 }
1467
1468 public Map<McastStoreKey, McastHandler.McastRole> getMcastRoles(IpAddress mcastIp) {
1469 // If mcast ip is present
1470 if (mcastIp != null) {
1471 return mcastRoleStore.entrySet().stream()
1472 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1473 .collect(Collectors.toMap(Map.Entry::getKey,
1474 entry -> entry.getValue().value()));
1475 }
1476 // Otherwise take all the groups
1477 return mcastRoleStore.entrySet().stream()
1478 .collect(Collectors.toMap(Map.Entry::getKey,
1479 entry -> entry.getValue().value()));
1480 }
1481
1482 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1483 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1484 // Get the source
1485 ConnectPoint source = getSource(mcastIp);
1486 // Source cannot be null, we don't know the starting point
1487 if (source != null) {
1488 // Init steps
1489 Set<DeviceId> visited = Sets.newHashSet();
1490 List<ConnectPoint> currentPath = Lists.newArrayList(
1491 source
1492 );
1493 // Build recursively the mcast paths
1494 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1495 }
1496 return mcastPaths;
1497 }
1498
1499 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1500 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1501 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1502 // If we have visited the node to visit
1503 // there is a loop
1504 if (visited.contains(toVisit)) {
1505 return;
1506 }
1507 // Visit next-hop
1508 visited.add(toVisit);
1509 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1510 // Looking for next-hops
1511 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1512 // Build egress connectpoints
1513 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1514 // Get Ports
1515 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
1516 // Build relative cps
1517 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1518 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1519 Set<ConnectPoint> egressPoints = cpBuilder.build();
1520 // Define other variables for the next steps
1521 Set<Link> egressLinks;
1522 List<ConnectPoint> newCurrentPath;
1523 Set<DeviceId> newVisited;
1524 DeviceId newToVisit;
1525 for (ConnectPoint egressPoint : egressPoints) {
1526 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1527 // If it does not have egress links, stop
1528 if (egressLinks.isEmpty()) {
1529 // Add the connect points to the path
1530 newCurrentPath = Lists.newArrayList(currentPath);
1531 newCurrentPath.add(0, egressPoint);
1532 // Save in the map
1533 mcastPaths.put(egressPoint, newCurrentPath);
1534 } else {
1535 newVisited = Sets.newHashSet(visited);
1536 // Iterate over the egress links for the next hops
1537 for (Link egressLink : egressLinks) {
1538 // Update to visit
1539 newToVisit = egressLink.dst().deviceId();
1540 // Add the connect points to the path
1541 newCurrentPath = Lists.newArrayList(currentPath);
1542 newCurrentPath.add(0, egressPoint);
1543 newCurrentPath.add(0, egressLink.dst());
1544 // Go to the next hop
1545 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1546 }
1547 }
1548 }
1549 }
1550 }
1551
Charles Chanc91c8782016-03-30 17:54:24 -07001552}