blob: cd59ee5dc4b8322bbe4528fda73554d96a28c822 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010021import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070022import com.google.common.collect.Sets;
23import org.onlab.packet.Ethernet;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pier Luigi580fd8a2018-01-16 10:47:50 +010029import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Ray Milkeyae0068a2017-08-15 11:02:29 -070032import org.onosproject.net.config.basics.McastConfig;
Charles Chanc91c8782016-03-30 17:54:24 -070033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.Link;
36import org.onosproject.net.Path;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.flow.criteria.Criteria;
43import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
44import org.onosproject.net.flowobjective.DefaultFilteringObjective;
45import org.onosproject.net.flowobjective.DefaultForwardingObjective;
46import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070051import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070052import org.onosproject.net.mcast.McastEvent;
Pier Luigi35dab3f2018-01-25 16:16:02 +010053import org.onosproject.net.mcast.McastRoute;
Charles Chanc91c8782016-03-30 17:54:24 -070054import org.onosproject.net.mcast.McastRouteInfo;
Pier Luigid8a15162018-02-15 16:33:08 +010055import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.net.topology.TopologyService;
Pier Luigi69f774d2018-02-28 12:10:50 +010057import org.onosproject.segmentrouting.SegmentRoutingManager;
58import org.onosproject.segmentrouting.SegmentRoutingService;
Pier Luigi51ee7c02018-02-23 19:57:40 +010059import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Charles Chan370a65b2016-05-10 17:29:47 -070060import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan72779502016-04-23 17:36:10 -070061import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070062import org.onosproject.store.serializers.KryoNamespaces;
63import org.onosproject.store.service.ConsistentMap;
64import org.onosproject.store.service.Serializer;
65import org.onosproject.store.service.StorageService;
Pier Luigi580fd8a2018-01-16 10:47:50 +010066import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070067import org.slf4j.Logger;
68import org.slf4j.LoggerFactory;
69
Pier Luigi35dab3f2018-01-25 16:16:02 +010070import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070071import java.util.Collection;
72import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010073import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070074import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070075import java.util.Map;
Charles Chanc91c8782016-03-30 17:54:24 -070076import java.util.Optional;
77import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010078import java.util.concurrent.ScheduledExecutorService;
79import java.util.concurrent.TimeUnit;
80import java.util.concurrent.locks.Lock;
81import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070082import java.util.stream.Collectors;
83
84import static com.google.common.base.Preconditions.checkState;
Pier Luigi35dab3f2018-01-25 16:16:02 +010085import static java.util.concurrent.Executors.newScheduledThreadPool;
86import static org.onlab.util.Tools.groupedThreads;
Charles Chan10b0fb72017-02-02 16:20:42 -080087import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -070088
89/**
Pier Luigi69f774d2018-02-28 12:10:50 +010090 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070091 */
Charles Chan1eaf4802016-04-18 13:44:03 -070092public class McastHandler {
93 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070094 private final SegmentRoutingManager srManager;
95 private final ApplicationId coreAppId;
Charles Chan82f19972016-05-17 13:13:55 -070096 private final StorageService storageService;
97 private final TopologyService topologyService;
Charles Chan72779502016-04-23 17:36:10 -070098 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
99 private final KryoNamespace.Builder mcastKryo;
100 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
101
Pier Luigi35dab3f2018-01-25 16:16:02 +0100102 // Mcast lock to serialize local operations
103 private final Lock mcastLock = new ReentrantLock();
104
105 /**
106 * Acquires the lock used when making mcast changes.
107 */
108 private void mcastLock() {
109 mcastLock.lock();
110 }
111
112 /**
113 * Releases the lock used when making mcast changes.
114 */
115 private void mcastUnlock() {
116 mcastLock.unlock();
117 }
118
119 // Stability threshold for Mcast. Seconds
120 private static final long MCAST_STABLITY_THRESHOLD = 5;
121 // Last change done
122 private Instant lastMcastChange = Instant.now();
123
124 /**
125 * Determines if mcast in the network has been stable in the last
126 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
127 * to the last mcast change timestamp.
128 *
129 * @return true if stable
130 */
131 private boolean isMcastStable() {
132 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
133 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
134 log.debug("Mcast stable since {}s", now - last);
135 return (now - last) > MCAST_STABLITY_THRESHOLD;
136 }
137
138 // Verify interval for Mcast
139 private static final long MCAST_VERIFY_INTERVAL = 30;
140
141 // Executor for mcast bucket corrector
142 private ScheduledExecutorService executorService
143 = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
144
Charles Chan72779502016-04-23 17:36:10 -0700145 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700146 * Constructs the McastEventHandler.
147 *
148 * @param srManager Segment Routing manager
149 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700150 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -0700151 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700152 this.srManager = srManager;
153 this.storageService = srManager.storageService;
154 this.topologyService = srManager.topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700155 mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700156 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700157 .register(McastStoreKey.class)
158 .register(McastRole.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700159 mcastNextObjStore = storageService
Charles Chan72779502016-04-23 17:36:10 -0700160 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700161 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700162 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700163 .build();
Charles Chan72779502016-04-23 17:36:10 -0700164 mcastRoleStore = storageService
165 .<McastStoreKey, McastRole>consistentMapBuilder()
166 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700167 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700168 .build();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100169 // Init the executor service and the buckets corrector
170 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
171 MCAST_VERIFY_INTERVAL,
172 TimeUnit.SECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700173 }
174
175 /**
176 * Read initial multicast from mcast store.
177 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100178 public void init() {
Charles Chan72779502016-04-23 17:36:10 -0700179 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
180 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
181 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
182 sinks.forEach(sink -> {
183 processSinkAddedInternal(source, sink, mcastRoute.group());
184 });
185 });
Charles Chanc91c8782016-03-30 17:54:24 -0700186 }
187
188 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100189 * Clean up when deactivating the application.
190 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100191 public void terminate() {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100192 executorService.shutdown();
193 }
194
195 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700196 * Processes the SOURCE_ADDED event.
197 *
198 * @param event McastEvent with SOURCE_ADDED type
199 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100200 public void processSourceAdded(McastEvent event) {
Charles Chanc91c8782016-03-30 17:54:24 -0700201 log.info("processSourceAdded {}", event);
202 McastRouteInfo mcastRouteInfo = event.subject();
203 if (!mcastRouteInfo.isComplete()) {
204 log.info("Incompleted McastRouteInfo. Abort.");
205 return;
206 }
207 ConnectPoint source = mcastRouteInfo.source().orElse(null);
208 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
209 IpAddress mcastIp = mcastRouteInfo.route().group();
210
Pier Luigi35dab3f2018-01-25 16:16:02 +0100211 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
Charles Chanc91c8782016-03-30 17:54:24 -0700212 }
213
214 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100215 * Processes the SOURCE_UPDATED event.
216 *
217 * @param event McastEvent with SOURCE_UPDATED type
218 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100219 public void processSourceUpdated(McastEvent event) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100220 log.info("processSourceUpdated {}", event);
221 // Get old and new data
222 McastRouteInfo mcastRouteInfo = event.subject();
223 ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
224 mcastRouteInfo = event.prevSubject();
225 ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
226 // and group ip
227 IpAddress mcastIp = mcastRouteInfo.route().group();
228 // Process the update event
229 processSourceUpdatedInternal(mcastIp, newSource, oldSource);
230 }
231
232 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700233 * Processes the SINK_ADDED event.
234 *
235 * @param event McastEvent with SINK_ADDED type
236 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100237 public void processSinkAdded(McastEvent event) {
Charles Chanc91c8782016-03-30 17:54:24 -0700238 log.info("processSinkAdded {}", event);
239 McastRouteInfo mcastRouteInfo = event.subject();
240 if (!mcastRouteInfo.isComplete()) {
241 log.info("Incompleted McastRouteInfo. Abort.");
242 return;
243 }
244 ConnectPoint source = mcastRouteInfo.source().orElse(null);
245 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
246 IpAddress mcastIp = mcastRouteInfo.route().group();
247
248 processSinkAddedInternal(source, sink, mcastIp);
249 }
250
251 /**
252 * Processes the SINK_REMOVED event.
253 *
254 * @param event McastEvent with SINK_REMOVED type
255 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100256 public void processSinkRemoved(McastEvent event) {
Charles Chanc91c8782016-03-30 17:54:24 -0700257 log.info("processSinkRemoved {}", event);
258 McastRouteInfo mcastRouteInfo = event.subject();
259 if (!mcastRouteInfo.isComplete()) {
260 log.info("Incompleted McastRouteInfo. Abort.");
261 return;
262 }
263 ConnectPoint source = mcastRouteInfo.source().orElse(null);
264 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
265 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chanc91c8782016-03-30 17:54:24 -0700266
Pier Luigi35dab3f2018-01-25 16:16:02 +0100267 processSinkRemovedInternal(source, sink, mcastIp);
268 }
Charles Chan0932eca2016-06-28 16:50:13 -0700269
Pier Luigi35dab3f2018-01-25 16:16:02 +0100270 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100271 * Processes the ROUTE_REMOVED event.
272 *
273 * @param event McastEvent with ROUTE_REMOVED type
274 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100275 public void processRouteRemoved(McastEvent event) {
Pier Luigi6786b922018-02-02 16:19:11 +0100276 log.info("processRouteRemoved {}", event);
277 McastRouteInfo mcastRouteInfo = event.subject();
278 if (!mcastRouteInfo.source().isPresent()) {
279 log.info("Incompleted McastRouteInfo. Abort.");
280 return;
281 }
282 // Get group ip and ingress connect point
283 IpAddress mcastIp = mcastRouteInfo.route().group();
Pier Luigie80d6b42018-02-26 12:31:38 +0100284 ConnectPoint source = mcastRouteInfo.source().orElse(null);
Pier Luigi6786b922018-02-02 16:19:11 +0100285
286 processRouteRemovedInternal(source, mcastIp);
287 }
288
289 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100290 * Process the SOURCE_UPDATED event.
291 *
292 * @param newSource the updated srouce info
293 * @param oldSource the outdated source info
294 */
295 private void processSourceUpdatedInternal(IpAddress mcastIp,
296 ConnectPoint newSource,
297 ConnectPoint oldSource) {
298 lastMcastChange = Instant.now();
299 mcastLock();
300 try {
301 log.debug("Processing source updated for group {}", mcastIp);
302
303 // Build key for the store and retrieve old data
304 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
305
306 // Verify leadership on the operation
307 if (!isLeader(oldSource)) {
308 log.debug("Skip {} due to lack of leadership", mcastIp);
309 return;
310 }
311
312 // This device is not serving this multicast group
313 if (!mcastRoleStore.containsKey(mcastStoreKey) ||
314 !mcastNextObjStore.containsKey(mcastStoreKey)) {
315 log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
316 return;
317 }
318 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
319 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
320
321 // Let's remove old flows and groups
322 removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
323 // Push new flows and group
324 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
325 mcastIp, assignedVlan(newSource)));
326 addFilterToDevice(newSource.deviceId(), newSource.port(),
327 assignedVlan(newSource), mcastIp);
328 // Setup mcast roles
329 mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
330 McastRole.INGRESS);
331 } finally {
332 mcastUnlock();
333 }
334 }
335
336 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100337 * Removes the entire mcast tree related to this group.
338 *
339 * @param mcastIp multicast group IP address
340 */
341 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
342 lastMcastChange = Instant.now();
343 mcastLock();
344 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100345 log.debug("Processing route removed for group {}", mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100346
347 // Find out the ingress, transit and egress device of the affected group
348 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
349 .stream().findAny().orElse(null);
350 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
351 .stream().findAny().orElse(null);
352 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
353
354 // Verify leadership on the operation
355 if (!isLeader(source)) {
356 log.debug("Skip {} due to lack of leadership", mcastIp);
357 return;
358 }
359
360 // If there are egress devices, sinks could be only on the ingress
361 if (!egressDevices.isEmpty()) {
362 egressDevices.forEach(
363 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
364 );
365 }
366 // Transit could be null
367 if (transitDevice != null) {
368 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
369 }
370 // Ingress device should be not null
371 if (ingressDevice != null) {
372 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
373 }
Pier Luigi6786b922018-02-02 16:19:11 +0100374 } finally {
375 mcastUnlock();
376 }
377 }
378
379 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100380 * Removes a path from source to sink for given multicast group.
381 *
382 * @param source connect point of the multicast source
383 * @param sink connection point of the multicast sink
384 * @param mcastIp multicast group IP address
385 */
386 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
387 IpAddress mcastIp) {
388 lastMcastChange = Instant.now();
389 mcastLock();
390 try {
Pier Luigi6786b922018-02-02 16:19:11 +0100391 // Verify leadership on the operation
392 if (!isLeader(source)) {
393 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -0700394 return;
395 }
Charles Chanc91c8782016-03-30 17:54:24 -0700396
Pier Luigi35dab3f2018-01-25 16:16:02 +0100397 // When source and sink are on the same device
398 if (source.deviceId().equals(sink.deviceId())) {
399 // Source and sink are on even the same port. There must be something wrong.
400 if (source.port().equals(sink.port())) {
401 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
402 mcastIp, sink, source);
403 return;
404 }
405 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
406 return;
407 }
Charles Chanc91c8782016-03-30 17:54:24 -0700408
Pier Luigi35dab3f2018-01-25 16:16:02 +0100409 // Process the egress device
410 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
411 if (isLast) {
412 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
413 }
414
415 // If this is the last sink on the device, also update upstream
416 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
417 if (mcastPath.isPresent()) {
418 List<Link> links = Lists.newArrayList(mcastPath.get().links());
419 Collections.reverse(links);
420 for (Link link : links) {
421 if (isLast) {
422 isLast = removePortFromDevice(
423 link.src().deviceId(),
424 link.src().port(),
425 mcastIp,
426 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
427 );
428 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
429 }
Charles Chanc91c8782016-03-30 17:54:24 -0700430 }
431 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100432 } finally {
433 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700434 }
435 }
436
437 /**
438 * Establishes a path from source to sink for given multicast group.
439 *
440 * @param source connect point of the multicast source
441 * @param sink connection point of the multicast sink
442 * @param mcastIp multicast group IP address
443 */
444 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
445 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100446 lastMcastChange = Instant.now();
447 mcastLock();
448 try {
449 // Continue only when this instance is the master of source device
450 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
451 log.debug("Skip {} due to lack of mastership of the source device {}",
452 mcastIp, source.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700453 return;
454 }
Charles Chanc91c8782016-03-30 17:54:24 -0700455
Pier Luigi35dab3f2018-01-25 16:16:02 +0100456 // Process the ingress device
457 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700458
Pier Luigi35dab3f2018-01-25 16:16:02 +0100459 // When source and sink are on the same device
460 if (source.deviceId().equals(sink.deviceId())) {
461 // Source and sink are on even the same port. There must be something wrong.
462 if (source.port().equals(sink.port())) {
463 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
464 mcastIp, sink, source);
465 return;
466 }
467 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
468 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
469 return;
470 }
Charles Chan72779502016-04-23 17:36:10 -0700471
Pier Luigi35dab3f2018-01-25 16:16:02 +0100472 // Find a path. If present, create/update groups and flows for each hop
473 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
474 if (mcastPath.isPresent()) {
475 List<Link> links = mcastPath.get().links();
476 checkState(links.size() == 2,
477 "Path in leaf-spine topology should always be two hops: ", links);
Charles Chan72779502016-04-23 17:36:10 -0700478
Pier Luigi35dab3f2018-01-25 16:16:02 +0100479 links.forEach(link -> {
480 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
481 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
482 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
483 });
484
485 // Process the egress device
486 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
487
488 // Setup mcast roles
489 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
490 McastRole.INGRESS);
491 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
492 McastRole.TRANSIT);
493 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
494 McastRole.EGRESS);
495 } else {
496 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
497 source.deviceId(), sink.deviceId());
498 }
499 } finally {
500 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700501 }
502 }
503
504 /**
Charles Chan72779502016-04-23 17:36:10 -0700505 * Processes the LINK_DOWN event.
506 *
507 * @param affectedLink Link that is going down
508 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100509 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100510 lastMcastChange = Instant.now();
511 mcastLock();
512 try {
513 // Get groups affected by the link down event
514 getAffectedGroups(affectedLink).forEach(mcastIp -> {
515 // TODO Optimize when the group editing is in place
516 log.debug("Processing link down {} for group {}",
517 affectedLink, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100518
Pier Luigi35dab3f2018-01-25 16:16:02 +0100519 // Find out the ingress, transit and egress device of affected group
520 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
521 .stream().findAny().orElse(null);
522 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
523 .stream().findAny().orElse(null);
524 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
525 ConnectPoint source = getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700526
Pier Luigi35dab3f2018-01-25 16:16:02 +0100527 // Do not proceed if any of these info is missing
528 if (ingressDevice == null || transitDevice == null
529 || egressDevices == null || source == null) {
530 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
531 ingressDevice, transitDevice, egressDevices, source);
532 return;
Charles Chan72779502016-04-23 17:36:10 -0700533 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100534
535 // Continue only when this instance is the master of source device
536 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
537 log.debug("Skip {} due to lack of mastership of the source device {}",
538 source.deviceId());
539 return;
540 }
541
542 // Remove entire transit
543 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
544
545 // Remove transit-facing port on ingress device
546 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
547 if (ingressTransitPort != null) {
548 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
549 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
550 }
551
552 // Construct a new path for each egress device
553 egressDevices.forEach(egressDevice -> {
554 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
555 if (mcastPath.isPresent()) {
556 installPath(mcastIp, source, mcastPath.get());
557 } else {
558 log.warn("Fail to recover egress device {} from link failure {}",
559 egressDevice, affectedLink);
560 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
561 }
562 });
Charles Chan72779502016-04-23 17:36:10 -0700563 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100564 } finally {
565 mcastUnlock();
566 }
Charles Chan72779502016-04-23 17:36:10 -0700567 }
568
569 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100570 * Process the DEVICE_DOWN event.
571 *
572 * @param deviceDown device going down
573 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100574 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100575 lastMcastChange = Instant.now();
576 mcastLock();
577 try {
578 // Get the mcast groups affected by the device going down
579 getAffectedGroups(deviceDown).forEach(mcastIp -> {
580 // TODO Optimize when the group editing is in place
581 log.debug("Processing device down {} for group {}",
582 deviceDown, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100583
Pier Luigi35dab3f2018-01-25 16:16:02 +0100584 // Find out the ingress, transit and egress device of affected group
585 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
586 .stream().findAny().orElse(null);
587 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
588 .stream().findAny().orElse(null);
589 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
590 ConnectPoint source = getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100591
Pier Luigi35dab3f2018-01-25 16:16:02 +0100592 // Do not proceed if ingress device or source of this group are missing
593 // If sinks are in other leafs, we have ingress, transit, egress, and source
594 // If sinks are in the same leaf, we have just ingress and source
595 if (ingressDevice == null || source == null) {
596 log.warn("Missing ingress {} or source {} for group {}",
597 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100598 return;
599 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100600
Pier Luigi6786b922018-02-02 16:19:11 +0100601 // Verify leadership on the operation
602 if (!isLeader(source)) {
603 log.debug("Skip {} due to lack of leadership", mcastIp);
604 return;
Pier Luigi580fd8a2018-01-16 10:47:50 +0100605 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100606
607 // If it exists, we have to remove it in any case
608 if (transitDevice != null) {
609 // Remove entire transit
610 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
611 }
612 // If the ingress is down
613 if (ingressDevice.equals(deviceDown)) {
614 // Remove entire ingress
615 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
616 // If other sinks different from the ingress exist
617 if (!egressDevices.isEmpty()) {
618 // Remove all the remaining egress
619 egressDevices.forEach(
620 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
621 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100622 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100623 } else {
624 // Egress or transit could be down at this point
625 // Get the ingress-transit port if it exists
626 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
627 if (ingressTransitPort != null) {
628 // Remove transit-facing port on ingress device
629 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
630 }
631 // One of the egress device is down
632 if (egressDevices.contains(deviceDown)) {
633 // Remove entire device down
634 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
635 // Remove the device down from egress
636 egressDevices.remove(deviceDown);
637 // If there are no more egress and ingress does not have sinks
638 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
639 // Remove entire ingress
640 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
641 // We have done
642 return;
643 }
644 }
645 // Construct a new path for each egress device
646 egressDevices.forEach(egressDevice -> {
647 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
648 // If there is a new path
649 if (mcastPath.isPresent()) {
650 // Let's install the new mcast path for this egress
651 installPath(mcastIp, source, mcastPath.get());
652 } else {
653 // We were not able to find an alternative path for this egress
654 log.warn("Fail to recover egress device {} from device down {}",
655 egressDevice, deviceDown);
656 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
657 }
658 });
659 }
660 });
661 } finally {
662 mcastUnlock();
663 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100664 }
665
666 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700667 * Adds filtering objective for given device and port.
668 *
669 * @param deviceId device ID
670 * @param port ingress port number
671 * @param assignedVlan assigned VLAN ID
672 */
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000673 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700674 // Do nothing if the port is configured as suppressed
Charles Chan370a65b2016-05-10 17:29:47 -0700675 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
676 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi69f774d2018-02-28 12:10:50 +0100677 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Charles Chan370a65b2016-05-10 17:29:47 -0700678 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
679 log.info("Ignore suppressed port {}", connectPoint);
Charles Chanc91c8782016-03-30 17:54:24 -0700680 return;
681 }
682
683 FilteringObjective.Builder filtObjBuilder =
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000684 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700685 ObjectiveContext context = new DefaultObjectiveContext(
686 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800687 deviceId, port.toLong(), assignedVlan),
Charles Chan72779502016-04-23 17:36:10 -0700688 (objective, error) ->
689 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800690 deviceId, port.toLong(), assignedVlan, error));
Charles Chan72779502016-04-23 17:36:10 -0700691 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chanc91c8782016-03-30 17:54:24 -0700692 }
693
694 /**
695 * Adds a port to given multicast group on given device. This involves the
696 * update of L3 multicast group and multicast routing table entry.
697 *
698 * @param deviceId device ID
699 * @param port port to be added
700 * @param mcastIp multicast group
701 * @param assignedVlan assigned VLAN ID
702 */
703 private void addPortToDevice(DeviceId deviceId, PortNumber port,
704 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700705 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700706 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +0100707 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -0700708 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700709 // First time someone request this mcast group via this device
710 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100711 // New nextObj
712 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
713 portBuilder.build(), null).add();
714 // Store the new port
715 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700716 } else {
717 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -0700718 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700719 // Stop if the port is already in the nextobj
720 Set<PortNumber> existingPorts = getPorts(nextObj.next());
721 if (existingPorts.contains(port)) {
722 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
723 return;
724 }
Pier Luigi4f0dd212018-01-19 10:24:53 +0100725 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -0800726 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100727 // Reuse previous nextObj
728 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
729 portBuilder.build(), nextObj.id()).addToExisting();
730 // Store the final next objective and send only the difference to the driver
731 mcastNextObjStore.put(mcastStoreKey, newNextObj);
732 // Add just the new port
733 portBuilder = ImmutableSet.builder();
734 portBuilder.add(port);
735 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
736 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -0700737 }
738 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -0700739 ObjectiveContext context = new DefaultObjectiveContext(
740 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
741 mcastIp, deviceId, port.toLong(), assignedVlan),
742 (objective, error) ->
743 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
744 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700745 ForwardingObjective fwdObj =
Charles Chan72779502016-04-23 17:36:10 -0700746 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -0700747 srManager.flowObjectiveService.next(deviceId, newNextObj);
748 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700749 }
750
751 /**
752 * Removes a port from given multicast group on given device.
753 * This involves the update of L3 multicast group and multicast routing
754 * table entry.
755 *
756 * @param deviceId device ID
757 * @param port port to be added
758 * @param mcastIp multicast group
759 * @param assignedVlan assigned VLAN ID
760 * @return true if this is the last sink on this device
761 */
762 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
763 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700764 McastStoreKey mcastStoreKey =
765 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700766 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -0700767 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700768 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
769 return false;
770 }
Charles Chan72779502016-04-23 17:36:10 -0700771 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700772
773 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -0700774 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -0700775 if (!existingPorts.contains(port)) {
776 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
777 return false;
778 }
779 // Copy and modify the ImmutableSet
780 existingPorts = Sets.newHashSet(existingPorts);
781 existingPorts.remove(port);
782
783 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +0100784 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -0700785 ForwardingObjective fwdObj;
786 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +0100787 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -0700788 // NOTE: Rely on GroupStore garbage collection rather than explicitly
789 // remove L3MG since there might be other flows/groups refer to
790 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +0100791 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700792 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
793 mcastIp, deviceId, port.toLong(), assignedVlan),
794 (objective, error) ->
795 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
796 mcastIp, deviceId, port.toLong(), assignedVlan, error));
797 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
798 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -0700799 } else {
800 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +0100801 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700802 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
803 mcastIp, deviceId, port.toLong(), assignedVlan),
804 (objective, error) ->
805 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
806 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +0100807 // Here we store the next objective with the remaining port
808 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
809 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan82f19972016-05-17 13:13:55 -0700810 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -0700811 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700812 }
Pier Luigi8cd46de2018-01-19 10:24:53 +0100813 // Let's modify the next objective removing the bucket
814 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
815 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
816 srManager.flowObjectiveService.next(deviceId, newNextObj);
817 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700818 return existingPorts.isEmpty();
819 }
820
Charles Chan72779502016-04-23 17:36:10 -0700821 /**
822 * Removes entire group on given device.
823 *
824 * @param deviceId device ID
825 * @param mcastIp multicast group to be removed
826 * @param assignedVlan assigned VLAN ID
827 */
828 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
829 VlanId assignedVlan) {
830 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
831 // This device is not serving this multicast group
832 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
833 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
834 return;
835 }
836 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
837 // NOTE: Rely on GroupStore garbage collection rather than explicitly
838 // remove L3MG since there might be other flows/groups refer to
839 // the same L2IG
840 ObjectiveContext context = new DefaultObjectiveContext(
841 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
842 mcastIp, deviceId, assignedVlan),
843 (objective, error) ->
844 log.warn("Failed to remove {} on {}, vlan {}: {}",
845 mcastIp, deviceId, assignedVlan, error));
846 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
847 srManager.flowObjectiveService.forward(deviceId, fwdObj);
848 mcastNextObjStore.remove(mcastStoreKey);
849 mcastRoleStore.remove(mcastStoreKey);
850 }
851
Pier Luigi580fd8a2018-01-16 10:47:50 +0100852 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
853 // Get Links
854 List<Link> links = mcastPath.links();
855 // For each link, modify the next on the source device adding the src port
856 // and a new filter objective on the destination port
857 links.forEach(link -> {
858 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
859 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
860 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
861 mcastIp);
862 });
863 // Setup new transit mcast role
864 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
865 McastRole.TRANSIT);
Charles Chan72779502016-04-23 17:36:10 -0700866 }
867
Charles Chanc91c8782016-03-30 17:54:24 -0700868 /**
869 * Creates a next objective builder for multicast.
870 *
871 * @param mcastIp multicast group
872 * @param assignedVlan assigned VLAN ID
873 * @param outPorts set of output port numbers
874 * @return next objective builder
875 */
876 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi4f0dd212018-01-19 10:24:53 +0100877 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
878 // If nextId is null allocate a new one
879 if (nextId == null) {
880 nextId = srManager.flowObjectiveService.allocateNextId();
881 }
Charles Chanc91c8782016-03-30 17:54:24 -0700882
883 TrafficSelector metadata =
884 DefaultTrafficSelector.builder()
885 .matchVlanId(assignedVlan)
886 .matchIPDst(mcastIp.toIpPrefix())
887 .build();
888
889 NextObjective.Builder nextObjBuilder = DefaultNextObjective
890 .builder().withId(nextId)
Pier Luigi69f774d2018-02-28 12:10:50 +0100891 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId())
Charles Chanc91c8782016-03-30 17:54:24 -0700892 .withMeta(metadata);
893
894 outPorts.forEach(port -> {
895 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
896 if (egressVlan().equals(VlanId.NONE)) {
897 tBuilder.popVlan();
898 }
899 tBuilder.setOutput(port);
900 nextObjBuilder.addTreatment(tBuilder.build());
901 });
902
903 return nextObjBuilder;
904 }
905
906 /**
907 * Creates a forwarding objective builder for multicast.
908 *
909 * @param mcastIp multicast group
910 * @param assignedVlan assigned VLAN ID
911 * @param nextId next ID of the L3 multicast group
912 * @return forwarding objective builder
913 */
914 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
915 VlanId assignedVlan, int nextId) {
916 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000917 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
918
919 if (mcastIp.isIp4()) {
920 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
921 sbuilder.matchIPDst(mcastPrefix);
922 } else {
923 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
924 sbuilder.matchIPv6Dst(mcastPrefix);
925 }
926
927
Charles Chanc91c8782016-03-30 17:54:24 -0700928 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
929 metabuilder.matchVlanId(assignedVlan);
930
931 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
932 fwdBuilder.withSelector(sbuilder.build())
933 .withMeta(metabuilder.build())
934 .nextStep(nextId)
935 .withFlag(ForwardingObjective.Flag.SPECIFIC)
Pier Luigi69f774d2018-02-28 12:10:50 +0100936 .fromApp(srManager.appId())
Charles Chanc91c8782016-03-30 17:54:24 -0700937 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
938 return fwdBuilder;
939 }
940
941 /**
942 * Creates a filtering objective builder for multicast.
943 *
944 * @param deviceId Device ID
945 * @param ingressPort ingress port of the multicast stream
946 * @param assignedVlan assigned VLAN ID
947 * @return filtering objective builder
948 */
949 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000950 VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700951 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Charles Chan0932eca2016-06-28 16:50:13 -0700952
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000953 if (mcastIp.isIp4()) {
954 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
955 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
956 MacAddress.IPV4_MULTICAST_MASK))
957 .addCondition(Criteria.matchVlanId(egressVlan()))
958 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
959 } else {
960 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
961 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
962 MacAddress.IPV6_MULTICAST_MASK))
963 .addCondition(Criteria.matchVlanId(egressVlan()))
964 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
965 }
Charles Chan0932eca2016-06-28 16:50:13 -0700966 TrafficTreatment tt = DefaultTrafficTreatment.builder()
967 .pushVlan().setVlanId(assignedVlan).build();
968 filtBuilder.withMeta(tt);
969
Pier Luigi69f774d2018-02-28 12:10:50 +0100970 return filtBuilder.permit().fromApp(srManager.appId());
Charles Chanc91c8782016-03-30 17:54:24 -0700971 }
972
973 /**
974 * Gets output ports information from treatments.
975 *
976 * @param treatments collection of traffic treatments
977 * @return set of output port numbers
978 */
979 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
980 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
981 treatments.forEach(treatment -> {
982 treatment.allInstructions().stream()
983 .filter(instr -> instr instanceof OutputInstruction)
984 .forEach(instr -> {
985 builder.add(((OutputInstruction) instr).port());
986 });
987 });
988 return builder.build();
989 }
990
Pier Luigi51ee7c02018-02-23 19:57:40 +0100991 // Utility method to verify is a link is a pair-link
992 private boolean isPairLink(Link link) {
993 // Take src id, src port, dst id and dst port
994 final DeviceId srcId = link.src().deviceId();
995 final PortNumber srcPort = link.src().port();
996 final DeviceId dstId = link.dst().deviceId();
997 final PortNumber dstPort = link.dst().port();
998 // init as true
999 boolean isPairLink = true;
1000 try {
1001 // If one of this condition is not true; it is not a pair link
Pier Luigi69f774d2018-02-28 12:10:50 +01001002 if (!(srManager.deviceConfiguration().isEdgeDevice(srcId) &&
1003 srManager.deviceConfiguration().isEdgeDevice(dstId) &&
1004 srManager.deviceConfiguration().getPairDeviceId(srcId).equals(dstId) &&
1005 srManager.deviceConfiguration().getPairLocalPort(srcId).equals(srcPort) &&
1006 srManager.deviceConfiguration().getPairLocalPort(dstId).equals(dstPort))) {
Pier Luigi51ee7c02018-02-23 19:57:40 +01001007 isPairLink = false;
1008 }
1009 } catch (DeviceConfigNotFoundException e) {
1010 // Configuration not provided
1011 log.warn("Could not check if the link {} is pairlink "
1012 + "config not yet provided", link);
1013 isPairLink = false;
1014 }
1015 return isPairLink;
1016 }
1017
Charles Chanc91c8782016-03-30 17:54:24 -07001018 /**
1019 * Gets a path from src to dst.
1020 * If a path was allocated before, returns the allocated path.
1021 * Otherwise, randomly pick one from available paths.
1022 *
1023 * @param src source device ID
1024 * @param dst destination device ID
1025 * @param mcastIp multicast group
1026 * @return an optional path from src to dst
1027 */
1028 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
Pier Luigid8a15162018-02-15 16:33:08 +01001029 // Takes a snapshot of the topology
1030 final Topology currentTopology = topologyService.currentTopology();
Charles Chanc91c8782016-03-30 17:54:24 -07001031 List<Path> allPaths = Lists.newArrayList(
Pier Luigid8a15162018-02-15 16:33:08 +01001032 topologyService.getPaths(currentTopology, src, dst)
1033 );
Pier Luigi51ee7c02018-02-23 19:57:40 +01001034 // Create list of valid paths
1035 allPaths.removeIf(path -> path.links().stream().anyMatch(this::isPairLink));
1036 // If there are no valid paths, just exit
Charles Chan72779502016-04-23 17:36:10 -07001037 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chanc91c8782016-03-30 17:54:24 -07001038 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001039 return Optional.empty();
1040 }
1041
Pier Luigi91573e12018-01-23 16:06:38 +01001042 // Create a map index of suitablity-to-list of paths. For example
1043 // a path in the list associated to the index 1 shares only the
1044 // first hop and it is less suitable of a path belonging to the index
1045 // 2 that shares leaf-spine.
1046 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1047 // Some init steps
1048 int nhop;
1049 McastStoreKey mcastStoreKey;
1050 Link hop;
1051 PortNumber srcPort;
1052 Set<PortNumber> existingPorts;
1053 NextObjective nextObj;
1054 // Iterate over paths looking for eligible paths
1055 for (Path path : allPaths) {
1056 // Unlikely, it will happen...
1057 if (!src.equals(path.links().get(0).src().deviceId())) {
1058 continue;
1059 }
1060 nhop = 0;
1061 // Iterate over the links
1062 while (nhop < path.links().size()) {
1063 // Get the link and verify if a next related
1064 // to the src device exist in the store
1065 hop = path.links().get(nhop);
1066 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1067 // It does not exist in the store, exit
1068 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1069 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001070 }
Pier Luigi91573e12018-01-23 16:06:38 +01001071 // Get the output ports on the next
1072 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1073 existingPorts = getPorts(nextObj.next());
1074 // And the src port on the link
1075 srcPort = hop.src().port();
1076 // the src port is not used as output, exit
1077 if (!existingPorts.contains(srcPort)) {
1078 break;
1079 }
1080 nhop++;
1081 }
1082 // n_hop defines the index
1083 if (nhop > 0) {
1084 eligiblePaths.compute(nhop, (index, paths) -> {
1085 paths = paths == null ? Lists.newArrayList() : paths;
1086 paths.add(path);
1087 return paths;
1088 });
Charles Chanc91c8782016-03-30 17:54:24 -07001089 }
1090 }
Pier Luigi91573e12018-01-23 16:06:38 +01001091
1092 // No suitable paths
1093 if (eligiblePaths.isEmpty()) {
1094 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1095 // Otherwise, randomly pick a path
1096 Collections.shuffle(allPaths);
1097 return allPaths.stream().findFirst();
1098 }
1099
1100 // Let's take the best ones
1101 Integer bestIndex = eligiblePaths.keySet()
1102 .stream()
1103 .sorted(Comparator.reverseOrder())
1104 .findFirst().orElse(null);
1105 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1106 log.debug("{} eligiblePath(s) found from {} to {}",
1107 bestPaths.size(), src, dst);
1108 // randomly pick a path on the highest index
1109 Collections.shuffle(bestPaths);
1110 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001111 }
1112
1113 /**
Charles Chan72779502016-04-23 17:36:10 -07001114 * Gets device(s) of given role in given multicast group.
1115 *
1116 * @param mcastIp multicast IP
1117 * @param role multicast role
1118 * @return set of device ID or empty set if not found
1119 */
1120 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1121 return mcastRoleStore.entrySet().stream()
1122 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1123 entry.getValue().value() == role)
1124 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
1125 .collect(Collectors.toSet());
1126 }
1127
1128 /**
Charles Chana8f9dee2016-05-16 18:44:13 -07001129 * Gets source connect point of given multicast group.
1130 *
1131 * @param mcastIp multicast IP
1132 * @return source connect point or null if not found
1133 */
1134 private ConnectPoint getSource(IpAddress mcastIp) {
1135 return srManager.multicastRouteService.getRoutes().stream()
1136 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
1137 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
1138 .findAny().orElse(null);
1139 }
1140
1141 /**
Charles Chan72779502016-04-23 17:36:10 -07001142 * Gets groups which is affected by the link down event.
1143 *
1144 * @param link link going down
1145 * @return a set of multicast IpAddress
1146 */
1147 private Set<IpAddress> getAffectedGroups(Link link) {
1148 DeviceId deviceId = link.src().deviceId();
1149 PortNumber port = link.src().port();
1150 return mcastNextObjStore.entrySet().stream()
1151 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1152 getPorts(entry.getValue().value().next()).contains(port))
1153 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1154 .collect(Collectors.toSet());
1155 }
1156
1157 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001158 * Gets groups which are affected by the device down event.
1159 *
1160 * @param deviceId device going down
1161 * @return a set of multicast IpAddress
1162 */
1163 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1164 return mcastNextObjStore.entrySet().stream()
1165 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
1166 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1167 .collect(Collectors.toSet());
1168 }
1169
1170 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001171 * Gets egress VLAN from McastConfig.
1172 *
1173 * @return egress VLAN or VlanId.NONE if not configured
1174 */
1175 private VlanId egressVlan() {
1176 McastConfig mcastConfig =
1177 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1178 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1179 }
1180
1181 /**
1182 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chana8f9dee2016-05-16 18:44:13 -07001183 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chanc91c8782016-03-30 17:54:24 -07001184 *
Charles Chana8f9dee2016-05-16 18:44:13 -07001185 * @param cp connect point; Can be null if not specified
1186 * @return assigned VLAN ID
Charles Chanc91c8782016-03-30 17:54:24 -07001187 */
Charles Chana8f9dee2016-05-16 18:44:13 -07001188 private VlanId assignedVlan(ConnectPoint cp) {
1189 // Use the egressVlan if it is tagged
1190 if (!egressVlan().equals(VlanId.NONE)) {
1191 return egressVlan();
1192 }
1193 // Reuse unicast VLAN if the port has subnet configured
1194 if (cp != null) {
Charles Chanf9759582017-10-20 19:09:16 -07001195 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan10b0fb72017-02-02 16:20:42 -08001196 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chana8f9dee2016-05-16 18:44:13 -07001197 }
Charles Chan10b0fb72017-02-02 16:20:42 -08001198 // Use DEFAULT_VLAN if none of the above matches
1199 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -07001200 }
Charles Chan72779502016-04-23 17:36:10 -07001201
1202 /**
1203 * Gets the spine-facing port on ingress device of given multicast group.
1204 *
1205 * @param mcastIp multicast IP
1206 * @return spine-facing port on ingress device
1207 */
1208 private PortNumber ingressTransitPort(IpAddress mcastIp) {
1209 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1210 .stream().findAny().orElse(null);
1211 if (ingressDevice != null) {
1212 NextObjective nextObj = mcastNextObjStore
1213 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1214 Set<PortNumber> ports = getPorts(nextObj.next());
1215
1216 for (PortNumber port : ports) {
1217 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001218 if (srManager.deviceConfiguration() != null &&
1219 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001220 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Charles Chan72779502016-04-23 17:36:10 -07001221 return port;
1222 }
1223 }
1224 }
1225 return null;
1226 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001227
1228 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001229 * Verify if the given device has sinks
1230 * for the multicast group.
1231 *
1232 * @param deviceId device Id
1233 * @param mcastIp multicast IP
1234 * @return true if the device has sink for the group.
1235 * False otherwise.
1236 */
1237 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1238 if (deviceId != null) {
1239 // Get the nextobjective
1240 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1241 new McastStoreKey(mcastIp, deviceId)
1242 );
1243 // If it exists
1244 if (versionedNextObj != null) {
1245 NextObjective nextObj = versionedNextObj.value();
1246 // Retrieves all the output ports
1247 Set<PortNumber> ports = getPorts(nextObj.next());
1248 // Tries to find at least one port that is not spine-facing
1249 for (PortNumber port : ports) {
1250 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001251 if (srManager.deviceConfiguration() != null &&
1252 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi580fd8a2018-01-16 10:47:50 +01001253 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1254 return true;
1255 }
1256 }
1257 }
1258 }
1259 return false;
1260 }
1261
1262 /**
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001263 * Removes filtering objective for given device and port.
1264 *
1265 * @param deviceId device ID
1266 * @param port ingress port number
1267 * @param assignedVlan assigned VLAN ID
1268 * @param mcastIp multicast IP address
1269 */
1270 private void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
1271 // Do nothing if the port is configured as suppressed
1272 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1273 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi69f774d2018-02-28 12:10:50 +01001274 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001275 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1276 log.info("Ignore suppressed port {}", connectPoint);
1277 return;
1278 }
1279
1280 FilteringObjective.Builder filtObjBuilder =
1281 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
1282 ObjectiveContext context = new DefaultObjectiveContext(
1283 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1284 deviceId, port.toLong(), assignedVlan),
1285 (objective, error) ->
1286 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1287 deviceId, port.toLong(), assignedVlan, error));
1288 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1289 }
1290
1291 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001292 * Updates filtering objective for given device and port.
1293 * It is called in general when the mcast config has been
1294 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001295 *
1296 * @param deviceId device ID
1297 * @param portNum ingress port number
1298 * @param vlanId assigned VLAN ID
1299 * @param install true to add, false to remove
1300 */
Pier Luigi69f774d2018-02-28 12:10:50 +01001301 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001302 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001303 lastMcastChange = Instant.now();
1304 mcastLock();
1305 try {
1306 // Iterates over the route and updates properly the filtering objective
1307 // on the source device.
1308 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1309 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
1310 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1311 if (install) {
1312 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1313 } else {
1314 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1315 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001316 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001317 });
1318 } finally {
1319 mcastUnlock();
1320 }
1321 }
1322
Pier Luigi6786b922018-02-02 16:19:11 +01001323 private boolean isLeader(ConnectPoint source) {
1324 // Continue only when we have the mastership on the operation
1325 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1326 // When the source is available we just check the mastership
1327 if (srManager.deviceService.isAvailable(source.deviceId())) {
1328 return false;
1329 }
1330 // Fallback with Leadership service
1331 // source id is used a topic
1332 NodeId leader = srManager.leadershipService.runForLeadership(
1333 source.deviceId().toString()).leaderNodeId();
1334 // Verify if this node is the leader
1335 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
1336 return false;
1337 }
1338 }
1339 // Done
1340 return true;
1341 }
1342
Pier Luigi35dab3f2018-01-25 16:16:02 +01001343 /**
1344 * Performs bucket verification operation for all mcast groups in the devices.
1345 * Firstly, it verifies that mcast is stable before trying verification operation.
1346 * Verification consists in creating new nexts with VERIFY operation. Actually,
1347 * the operation is totally delegated to the driver.
1348 */
1349 private final class McastBucketCorrector implements Runnable {
1350
1351 @Override
1352 public void run() {
1353 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1354 if (!isMcastStable()) {
1355 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001356 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001357 // Acquires lock
1358 mcastLock();
1359 try {
1360 // Iterates over the routes and verify the related next objectives
1361 srManager.multicastRouteService.getRoutes()
1362 .stream()
1363 .map(McastRoute::group)
1364 .forEach(mcastIp -> {
1365 log.trace("Running mcast buckets corrector for mcast group: {}",
1366 mcastIp);
1367
1368 // For each group we get current information in the store
1369 // and issue a check of the next objectives in place
1370 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1371 .stream().findAny().orElse(null);
1372 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
1373 .stream().findAny().orElse(null);
1374 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
1375 ConnectPoint source = getSource(mcastIp);
1376
1377 // Do not proceed if ingress device or source of this group are missing
1378 if (ingressDevice == null || source == null) {
1379 log.warn("Unable to run buckets corrector. " +
1380 "Missing ingress {} or source {} for group {}",
1381 ingressDevice, source, mcastIp);
1382 return;
1383 }
1384
1385 // Continue only when this instance is the master of source device
1386 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1387 log.trace("Unable to run buckets corrector. " +
1388 "Skip {} due to lack of mastership " +
1389 "of the source device {}",
1390 mcastIp, source.deviceId());
1391 return;
1392 }
1393
1394 // Create the set of the devices to be processed
1395 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1396 devicesBuilder.add(ingressDevice);
1397 if (transitDevice != null) {
1398 devicesBuilder.add(transitDevice);
1399 }
1400 if (!egressDevices.isEmpty()) {
1401 devicesBuilder.addAll(egressDevices);
1402 }
1403 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1404
1405 // Iterate over the devices
1406 devicesToProcess.forEach(deviceId -> {
1407 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1408 // If next exists in our store verify related next objective
1409 if (mcastNextObjStore.containsKey(currentKey)) {
1410 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1411 // Get current ports
1412 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1413 // Rebuild the next objective
1414 currentNext = nextObjBuilder(
1415 mcastIp,
1416 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1417 currentPorts,
1418 currentNext.id()
1419 ).verify();
1420 // Send to the flowobjective service
1421 srManager.flowObjectiveService.next(deviceId, currentNext);
1422 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001423 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001424 "Missing next for {} and group {}",
1425 deviceId, mcastIp);
1426 }
1427 });
1428
1429 });
1430 } finally {
1431 // Finally, it releases the lock
1432 mcastUnlock();
1433 }
1434
1435 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001436 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001437
1438 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1439 // If mcast ip is present
1440 if (mcastIp != null) {
1441 return mcastNextObjStore.entrySet().stream()
1442 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1443 .collect(Collectors.toMap(Map.Entry::getKey,
1444 entry -> entry.getValue().value().id()));
1445 }
1446 // Otherwise take all the groups
1447 return mcastNextObjStore.entrySet().stream()
1448 .collect(Collectors.toMap(Map.Entry::getKey,
1449 entry -> entry.getValue().value().id()));
1450 }
1451
Pier Luigi69f774d2018-02-28 12:10:50 +01001452 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001453 // If mcast ip is present
1454 if (mcastIp != null) {
1455 return mcastRoleStore.entrySet().stream()
1456 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1457 .collect(Collectors.toMap(Map.Entry::getKey,
1458 entry -> entry.getValue().value()));
1459 }
1460 // Otherwise take all the groups
1461 return mcastRoleStore.entrySet().stream()
1462 .collect(Collectors.toMap(Map.Entry::getKey,
1463 entry -> entry.getValue().value()));
1464 }
1465
1466 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1467 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1468 // Get the source
1469 ConnectPoint source = getSource(mcastIp);
1470 // Source cannot be null, we don't know the starting point
1471 if (source != null) {
1472 // Init steps
1473 Set<DeviceId> visited = Sets.newHashSet();
1474 List<ConnectPoint> currentPath = Lists.newArrayList(
1475 source
1476 );
1477 // Build recursively the mcast paths
1478 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1479 }
1480 return mcastPaths;
1481 }
1482
1483 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1484 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1485 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1486 // If we have visited the node to visit
1487 // there is a loop
1488 if (visited.contains(toVisit)) {
1489 return;
1490 }
1491 // Visit next-hop
1492 visited.add(toVisit);
1493 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1494 // Looking for next-hops
1495 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1496 // Build egress connectpoints
1497 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1498 // Get Ports
1499 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
1500 // Build relative cps
1501 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1502 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1503 Set<ConnectPoint> egressPoints = cpBuilder.build();
1504 // Define other variables for the next steps
1505 Set<Link> egressLinks;
1506 List<ConnectPoint> newCurrentPath;
1507 Set<DeviceId> newVisited;
1508 DeviceId newToVisit;
1509 for (ConnectPoint egressPoint : egressPoints) {
1510 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1511 // If it does not have egress links, stop
1512 if (egressLinks.isEmpty()) {
1513 // Add the connect points to the path
1514 newCurrentPath = Lists.newArrayList(currentPath);
1515 newCurrentPath.add(0, egressPoint);
1516 // Save in the map
1517 mcastPaths.put(egressPoint, newCurrentPath);
1518 } else {
1519 newVisited = Sets.newHashSet(visited);
1520 // Iterate over the egress links for the next hops
1521 for (Link egressLink : egressLinks) {
1522 // Update to visit
1523 newToVisit = egressLink.dst().deviceId();
1524 // Add the connect points to the path
1525 newCurrentPath = Lists.newArrayList(currentPath);
1526 newCurrentPath.add(0, egressPoint);
1527 newCurrentPath.add(0, egressLink.dst());
1528 // Go to the next hop
1529 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1530 }
1531 }
1532 }
1533 }
1534 }
1535
Charles Chanc91c8782016-03-30 17:54:24 -07001536}