blob: dc8a36dea5989bbb5f4ad044d1da59fc16a7c45c [file] [log] [blame]
Charles Chand55e84d2016-03-30 17:54:24 -07001/*
Pier Luigi004d7cf2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chand55e84d2016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi004d7cf2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chand55e84d2016-03-30 17:54:24 -070018
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
Pier Luigi5131dad2018-01-23 16:06:38 +010021import com.google.common.collect.Maps;
Charles Chand55e84d2016-03-30 17:54:24 -070022import com.google.common.collect.Sets;
23import org.onlab.packet.Ethernet;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pier Luigi24592652018-01-16 10:47:50 +010029import org.onosproject.cluster.NodeId;
Charles Chand55e84d2016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Ray Milkey6c1f0f02017-08-15 11:02:29 -070032import org.onosproject.net.config.basics.McastConfig;
Charles Chand55e84d2016-03-30 17:54:24 -070033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.Link;
36import org.onosproject.net.Path;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.flow.criteria.Criteria;
43import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
44import org.onosproject.net.flowobjective.DefaultFilteringObjective;
45import org.onosproject.net.flowobjective.DefaultForwardingObjective;
46import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070051import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070052import org.onosproject.net.mcast.McastEvent;
Pier Luigi3dfd8352018-01-25 16:16:02 +010053import org.onosproject.net.mcast.McastRoute;
Charles Chand55e84d2016-03-30 17:54:24 -070054import org.onosproject.net.mcast.McastRouteInfo;
Pier Luigi80cb7482018-02-15 16:33:08 +010055import org.onosproject.net.topology.Topology;
Charles Chand55e84d2016-03-30 17:54:24 -070056import org.onosproject.net.topology.TopologyService;
Pier Luigi004d7cf2018-02-28 12:10:50 +010057import org.onosproject.segmentrouting.SegmentRoutingManager;
58import org.onosproject.segmentrouting.SegmentRoutingService;
Pier Luigiba5a7c32018-02-23 19:57:40 +010059import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Charles Chan6ea94fc2016-05-10 17:29:47 -070060import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan2199c302016-04-23 17:36:10 -070061import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chand55e84d2016-03-30 17:54:24 -070062import org.onosproject.store.serializers.KryoNamespaces;
63import org.onosproject.store.service.ConsistentMap;
64import org.onosproject.store.service.Serializer;
65import org.onosproject.store.service.StorageService;
Pier Luigi24592652018-01-16 10:47:50 +010066import org.onosproject.store.service.Versioned;
Charles Chand55e84d2016-03-30 17:54:24 -070067import org.slf4j.Logger;
68import org.slf4j.LoggerFactory;
69
Pier Luigi3dfd8352018-01-25 16:16:02 +010070import java.time.Instant;
Charles Chand55e84d2016-03-30 17:54:24 -070071import java.util.Collection;
72import java.util.Collections;
Pier Luigi5131dad2018-01-23 16:06:38 +010073import java.util.Comparator;
Charles Chand55e84d2016-03-30 17:54:24 -070074import java.util.List;
Charles Chan2199c302016-04-23 17:36:10 -070075import java.util.Map;
Charles Chand55e84d2016-03-30 17:54:24 -070076import java.util.Optional;
77import java.util.Set;
Pier Luigi3dfd8352018-01-25 16:16:02 +010078import java.util.concurrent.ScheduledExecutorService;
79import java.util.concurrent.TimeUnit;
80import java.util.concurrent.locks.Lock;
81import java.util.concurrent.locks.ReentrantLock;
Charles Chan2199c302016-04-23 17:36:10 -070082import java.util.stream.Collectors;
83
84import static com.google.common.base.Preconditions.checkState;
Pier Luigi3dfd8352018-01-25 16:16:02 +010085import static java.util.concurrent.Executors.newScheduledThreadPool;
86import static org.onlab.util.Tools.groupedThreads;
Charles Chan59cc16d2017-02-02 16:20:42 -080087import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Charles Chand55e84d2016-03-30 17:54:24 -070088
89/**
Pier Luigi004d7cf2018-02-28 12:10:50 +010090 * Handles Multicast related events.
Charles Chand55e84d2016-03-30 17:54:24 -070091 */
Charles Chand2990362016-04-18 13:44:03 -070092public class McastHandler {
93 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chand55e84d2016-03-30 17:54:24 -070094 private final SegmentRoutingManager srManager;
95 private final ApplicationId coreAppId;
Charles Chanfc5c7802016-05-17 13:13:55 -070096 private final StorageService storageService;
97 private final TopologyService topologyService;
Charles Chan2199c302016-04-23 17:36:10 -070098 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
99 private final KryoNamespace.Builder mcastKryo;
100 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
101
Pier Luigi3dfd8352018-01-25 16:16:02 +0100102 // Mcast lock to serialize local operations
103 private final Lock mcastLock = new ReentrantLock();
104
105 /**
106 * Acquires the lock used when making mcast changes.
107 */
108 private void mcastLock() {
109 mcastLock.lock();
110 }
111
112 /**
113 * Releases the lock used when making mcast changes.
114 */
115 private void mcastUnlock() {
116 mcastLock.unlock();
117 }
118
119 // Stability threshold for Mcast. Seconds
120 private static final long MCAST_STABLITY_THRESHOLD = 5;
121 // Last change done
122 private Instant lastMcastChange = Instant.now();
123
124 /**
125 * Determines if mcast in the network has been stable in the last
126 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
127 * to the last mcast change timestamp.
128 *
129 * @return true if stable
130 */
131 private boolean isMcastStable() {
132 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
133 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Dasa89b95a2018-02-14 14:14:54 -0800134 log.trace("Mcast stable since {}s", now - last);
Pier Luigi3dfd8352018-01-25 16:16:02 +0100135 return (now - last) > MCAST_STABLITY_THRESHOLD;
136 }
137
138 // Verify interval for Mcast
139 private static final long MCAST_VERIFY_INTERVAL = 30;
140
141 // Executor for mcast bucket corrector
142 private ScheduledExecutorService executorService
143 = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
144
Charles Chan2199c302016-04-23 17:36:10 -0700145 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700146 * Constructs the McastEventHandler.
147 *
148 * @param srManager Segment Routing manager
149 */
Charles Chand2990362016-04-18 13:44:03 -0700150 public McastHandler(SegmentRoutingManager srManager) {
Charles Chand55e84d2016-03-30 17:54:24 -0700151 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chand55e84d2016-03-30 17:54:24 -0700152 this.srManager = srManager;
153 this.storageService = srManager.storageService;
154 this.topologyService = srManager.topologyService;
Charles Chan2199c302016-04-23 17:36:10 -0700155 mcastKryo = new KryoNamespace.Builder()
Charles Chand55e84d2016-03-30 17:54:24 -0700156 .register(KryoNamespaces.API)
Charles Chan2199c302016-04-23 17:36:10 -0700157 .register(McastStoreKey.class)
158 .register(McastRole.class);
Charles Chand55e84d2016-03-30 17:54:24 -0700159 mcastNextObjStore = storageService
Charles Chan2199c302016-04-23 17:36:10 -0700160 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chand55e84d2016-03-30 17:54:24 -0700161 .withName("onos-mcast-nextobj-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700162 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chand55e84d2016-03-30 17:54:24 -0700163 .build();
Charles Chan2199c302016-04-23 17:36:10 -0700164 mcastRoleStore = storageService
165 .<McastStoreKey, McastRole>consistentMapBuilder()
166 .withName("onos-mcast-role-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700167 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan2199c302016-04-23 17:36:10 -0700168 .build();
Pier Luigi3dfd8352018-01-25 16:16:02 +0100169 // Init the executor service and the buckets corrector
170 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
171 MCAST_VERIFY_INTERVAL,
172 TimeUnit.SECONDS);
Charles Chan2199c302016-04-23 17:36:10 -0700173 }
174
175 /**
176 * Read initial multicast from mcast store.
177 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100178 public void init() {
Charles Chan2199c302016-04-23 17:36:10 -0700179 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
180 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
181 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
182 sinks.forEach(sink -> {
183 processSinkAddedInternal(source, sink, mcastRoute.group());
184 });
185 });
Charles Chand55e84d2016-03-30 17:54:24 -0700186 }
187
188 /**
Pier Luigi3dfd8352018-01-25 16:16:02 +0100189 * Clean up when deactivating the application.
190 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100191 public void terminate() {
Pier Luigi3dfd8352018-01-25 16:16:02 +0100192 executorService.shutdown();
193 }
194
195 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700196 * Processes the SOURCE_ADDED event.
197 *
198 * @param event McastEvent with SOURCE_ADDED type
199 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100200 public void processSourceAdded(McastEvent event) {
Charles Chand55e84d2016-03-30 17:54:24 -0700201 log.info("processSourceAdded {}", event);
202 McastRouteInfo mcastRouteInfo = event.subject();
203 if (!mcastRouteInfo.isComplete()) {
204 log.info("Incompleted McastRouteInfo. Abort.");
205 return;
206 }
207 ConnectPoint source = mcastRouteInfo.source().orElse(null);
208 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
209 IpAddress mcastIp = mcastRouteInfo.route().group();
210
Pier Luigi3dfd8352018-01-25 16:16:02 +0100211 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
Charles Chand55e84d2016-03-30 17:54:24 -0700212 }
213
214 /**
Pier Luigi1f5dc372018-02-26 12:31:38 +0100215 * Processes the SOURCE_UPDATED event.
216 *
217 * @param event McastEvent with SOURCE_UPDATED type
218 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100219 public void processSourceUpdated(McastEvent event) {
Pier Luigi1f5dc372018-02-26 12:31:38 +0100220 log.info("processSourceUpdated {}", event);
221 // Get old and new data
222 McastRouteInfo mcastRouteInfo = event.subject();
223 ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
224 mcastRouteInfo = event.prevSubject();
225 ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
226 // and group ip
227 IpAddress mcastIp = mcastRouteInfo.route().group();
228 // Process the update event
229 processSourceUpdatedInternal(mcastIp, newSource, oldSource);
230 }
231
232 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700233 * Processes the SINK_ADDED event.
234 *
235 * @param event McastEvent with SINK_ADDED type
236 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100237 public void processSinkAdded(McastEvent event) {
Charles Chand55e84d2016-03-30 17:54:24 -0700238 log.info("processSinkAdded {}", event);
239 McastRouteInfo mcastRouteInfo = event.subject();
240 if (!mcastRouteInfo.isComplete()) {
241 log.info("Incompleted McastRouteInfo. Abort.");
242 return;
243 }
244 ConnectPoint source = mcastRouteInfo.source().orElse(null);
245 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
246 IpAddress mcastIp = mcastRouteInfo.route().group();
247
248 processSinkAddedInternal(source, sink, mcastIp);
249 }
250
251 /**
252 * Processes the SINK_REMOVED event.
253 *
254 * @param event McastEvent with SINK_REMOVED type
255 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100256 public void processSinkRemoved(McastEvent event) {
Charles Chand55e84d2016-03-30 17:54:24 -0700257 log.info("processSinkRemoved {}", event);
258 McastRouteInfo mcastRouteInfo = event.subject();
259 if (!mcastRouteInfo.isComplete()) {
260 log.info("Incompleted McastRouteInfo. Abort.");
261 return;
262 }
263 ConnectPoint source = mcastRouteInfo.source().orElse(null);
264 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
265 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chand55e84d2016-03-30 17:54:24 -0700266
Pier Luigi3dfd8352018-01-25 16:16:02 +0100267 processSinkRemovedInternal(source, sink, mcastIp);
268 }
Charles Chan1588e7b2016-06-28 16:50:13 -0700269
Pier Luigi3dfd8352018-01-25 16:16:02 +0100270 /**
Pier Luigi1c34a442018-02-02 16:19:11 +0100271 * Processes the ROUTE_REMOVED event.
272 *
273 * @param event McastEvent with ROUTE_REMOVED type
274 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100275 public void processRouteRemoved(McastEvent event) {
Pier Luigi1c34a442018-02-02 16:19:11 +0100276 log.info("processRouteRemoved {}", event);
277 McastRouteInfo mcastRouteInfo = event.subject();
278 if (!mcastRouteInfo.source().isPresent()) {
279 log.info("Incompleted McastRouteInfo. Abort.");
280 return;
281 }
282 // Get group ip and ingress connect point
283 IpAddress mcastIp = mcastRouteInfo.route().group();
Pier Luigi1f5dc372018-02-26 12:31:38 +0100284 ConnectPoint source = mcastRouteInfo.source().orElse(null);
Pier Luigi1c34a442018-02-02 16:19:11 +0100285
286 processRouteRemovedInternal(source, mcastIp);
287 }
288
289 /**
Pier Luigi1f5dc372018-02-26 12:31:38 +0100290 * Process the SOURCE_UPDATED event.
291 *
292 * @param newSource the updated srouce info
293 * @param oldSource the outdated source info
294 */
295 private void processSourceUpdatedInternal(IpAddress mcastIp,
296 ConnectPoint newSource,
297 ConnectPoint oldSource) {
298 lastMcastChange = Instant.now();
299 mcastLock();
300 try {
301 log.debug("Processing source updated for group {}", mcastIp);
302
303 // Build key for the store and retrieve old data
304 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
305
306 // Verify leadership on the operation
307 if (!isLeader(oldSource)) {
308 log.debug("Skip {} due to lack of leadership", mcastIp);
309 return;
310 }
311
312 // This device is not serving this multicast group
313 if (!mcastRoleStore.containsKey(mcastStoreKey) ||
314 !mcastNextObjStore.containsKey(mcastStoreKey)) {
315 log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
316 return;
317 }
318 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
319 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
320
321 // Let's remove old flows and groups
322 removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
323 // Push new flows and group
324 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
325 mcastIp, assignedVlan(newSource)));
326 addFilterToDevice(newSource.deviceId(), newSource.port(),
327 assignedVlan(newSource), mcastIp);
328 // Setup mcast roles
329 mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
330 McastRole.INGRESS);
331 } finally {
332 mcastUnlock();
333 }
334 }
335
336 /**
Pier Luigi1c34a442018-02-02 16:19:11 +0100337 * Removes the entire mcast tree related to this group.
338 *
339 * @param mcastIp multicast group IP address
340 */
341 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
342 lastMcastChange = Instant.now();
343 mcastLock();
344 try {
Pier Luigi1f5dc372018-02-26 12:31:38 +0100345 log.debug("Processing route removed for group {}", mcastIp);
Pier Luigi1c34a442018-02-02 16:19:11 +0100346
347 // Find out the ingress, transit and egress device of the affected group
348 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
349 .stream().findAny().orElse(null);
350 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
351 .stream().findAny().orElse(null);
352 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
353
354 // Verify leadership on the operation
355 if (!isLeader(source)) {
356 log.debug("Skip {} due to lack of leadership", mcastIp);
357 return;
358 }
359
360 // If there are egress devices, sinks could be only on the ingress
361 if (!egressDevices.isEmpty()) {
362 egressDevices.forEach(
363 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
364 );
365 }
366 // Transit could be null
367 if (transitDevice != null) {
368 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
369 }
370 // Ingress device should be not null
371 if (ingressDevice != null) {
372 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
373 }
Pier Luigi1c34a442018-02-02 16:19:11 +0100374 } finally {
375 mcastUnlock();
376 }
377 }
378
379 /**
Pier Luigi3dfd8352018-01-25 16:16:02 +0100380 * Removes a path from source to sink for given multicast group.
381 *
382 * @param source connect point of the multicast source
383 * @param sink connection point of the multicast sink
384 * @param mcastIp multicast group IP address
385 */
386 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
387 IpAddress mcastIp) {
388 lastMcastChange = Instant.now();
389 mcastLock();
390 try {
Pier Luigi1c34a442018-02-02 16:19:11 +0100391 // Verify leadership on the operation
392 if (!isLeader(source)) {
393 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chand55e84d2016-03-30 17:54:24 -0700394 return;
395 }
Charles Chand55e84d2016-03-30 17:54:24 -0700396
Pier Luigi3dfd8352018-01-25 16:16:02 +0100397 // When source and sink are on the same device
398 if (source.deviceId().equals(sink.deviceId())) {
399 // Source and sink are on even the same port. There must be something wrong.
400 if (source.port().equals(sink.port())) {
401 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
402 mcastIp, sink, source);
403 return;
404 }
405 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
406 return;
407 }
Charles Chand55e84d2016-03-30 17:54:24 -0700408
Pier Luigi3dfd8352018-01-25 16:16:02 +0100409 // Process the egress device
410 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
411 if (isLast) {
412 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
413 }
414
415 // If this is the last sink on the device, also update upstream
416 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
417 if (mcastPath.isPresent()) {
418 List<Link> links = Lists.newArrayList(mcastPath.get().links());
419 Collections.reverse(links);
420 for (Link link : links) {
421 if (isLast) {
422 isLast = removePortFromDevice(
423 link.src().deviceId(),
424 link.src().port(),
425 mcastIp,
426 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
427 );
428 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
429 }
Charles Chand55e84d2016-03-30 17:54:24 -0700430 }
431 }
Pier Luigi3dfd8352018-01-25 16:16:02 +0100432 } finally {
433 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700434 }
435 }
436
437 /**
438 * Establishes a path from source to sink for given multicast group.
439 *
440 * @param source connect point of the multicast source
441 * @param sink connection point of the multicast sink
442 * @param mcastIp multicast group IP address
443 */
444 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
445 IpAddress mcastIp) {
Pier Luigi3dfd8352018-01-25 16:16:02 +0100446 lastMcastChange = Instant.now();
447 mcastLock();
448 try {
449 // Continue only when this instance is the master of source device
450 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
451 log.debug("Skip {} due to lack of mastership of the source device {}",
452 mcastIp, source.deviceId());
Charles Chand55e84d2016-03-30 17:54:24 -0700453 return;
454 }
Charles Chand55e84d2016-03-30 17:54:24 -0700455
Pier Luigi3dfd8352018-01-25 16:16:02 +0100456 // Process the ingress device
457 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
Charles Chan2199c302016-04-23 17:36:10 -0700458
Pier Luigi3dfd8352018-01-25 16:16:02 +0100459 // When source and sink are on the same device
460 if (source.deviceId().equals(sink.deviceId())) {
461 // Source and sink are on even the same port. There must be something wrong.
462 if (source.port().equals(sink.port())) {
463 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
464 mcastIp, sink, source);
465 return;
466 }
467 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
468 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
469 return;
470 }
Charles Chan2199c302016-04-23 17:36:10 -0700471
Pier Luigi3dfd8352018-01-25 16:16:02 +0100472 // Find a path. If present, create/update groups and flows for each hop
473 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
474 if (mcastPath.isPresent()) {
475 List<Link> links = mcastPath.get().links();
476 checkState(links.size() == 2,
477 "Path in leaf-spine topology should always be two hops: ", links);
Charles Chan2199c302016-04-23 17:36:10 -0700478
Pier Luigi3dfd8352018-01-25 16:16:02 +0100479 links.forEach(link -> {
480 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
481 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
482 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
483 });
484
485 // Process the egress device
486 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
487
488 // Setup mcast roles
489 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
490 McastRole.INGRESS);
491 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
492 McastRole.TRANSIT);
493 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
494 McastRole.EGRESS);
495 } else {
496 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
497 source.deviceId(), sink.deviceId());
498 }
499 } finally {
500 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700501 }
502 }
503
504 /**
Charles Chan2199c302016-04-23 17:36:10 -0700505 * Processes the LINK_DOWN event.
506 *
507 * @param affectedLink Link that is going down
508 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100509 public void processLinkDown(Link affectedLink) {
Pier Luigi3dfd8352018-01-25 16:16:02 +0100510 lastMcastChange = Instant.now();
511 mcastLock();
512 try {
513 // Get groups affected by the link down event
514 getAffectedGroups(affectedLink).forEach(mcastIp -> {
515 // TODO Optimize when the group editing is in place
516 log.debug("Processing link down {} for group {}",
517 affectedLink, mcastIp);
Pier Luigi24592652018-01-16 10:47:50 +0100518
Pier Luigi3dfd8352018-01-25 16:16:02 +0100519 // Find out the ingress, transit and egress device of affected group
520 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
521 .stream().findAny().orElse(null);
522 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
523 .stream().findAny().orElse(null);
524 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
525 ConnectPoint source = getSource(mcastIp);
Charles Chan8d449862016-05-16 18:44:13 -0700526
Pier Luigi3dfd8352018-01-25 16:16:02 +0100527 // Do not proceed if any of these info is missing
528 if (ingressDevice == null || transitDevice == null
529 || egressDevices == null || source == null) {
530 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
531 ingressDevice, transitDevice, egressDevices, source);
532 return;
Charles Chan2199c302016-04-23 17:36:10 -0700533 }
Pier Luigi3dfd8352018-01-25 16:16:02 +0100534
535 // Continue only when this instance is the master of source device
536 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
537 log.debug("Skip {} due to lack of mastership of the source device {}",
538 source.deviceId());
539 return;
540 }
541
542 // Remove entire transit
543 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
544
545 // Remove transit-facing port on ingress device
546 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
547 if (ingressTransitPort != null) {
548 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
549 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
550 }
551
552 // Construct a new path for each egress device
553 egressDevices.forEach(egressDevice -> {
554 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
555 if (mcastPath.isPresent()) {
556 installPath(mcastIp, source, mcastPath.get());
557 } else {
558 log.warn("Fail to recover egress device {} from link failure {}",
559 egressDevice, affectedLink);
560 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
561 }
562 });
Charles Chan2199c302016-04-23 17:36:10 -0700563 });
Pier Luigi3dfd8352018-01-25 16:16:02 +0100564 } finally {
565 mcastUnlock();
566 }
Charles Chan2199c302016-04-23 17:36:10 -0700567 }
568
569 /**
Pier Luigi24592652018-01-16 10:47:50 +0100570 * Process the DEVICE_DOWN event.
571 *
572 * @param deviceDown device going down
573 */
Pier Luigi004d7cf2018-02-28 12:10:50 +0100574 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi3dfd8352018-01-25 16:16:02 +0100575 lastMcastChange = Instant.now();
576 mcastLock();
577 try {
578 // Get the mcast groups affected by the device going down
579 getAffectedGroups(deviceDown).forEach(mcastIp -> {
580 // TODO Optimize when the group editing is in place
581 log.debug("Processing device down {} for group {}",
582 deviceDown, mcastIp);
Pier Luigi24592652018-01-16 10:47:50 +0100583
Pier Luigi3dfd8352018-01-25 16:16:02 +0100584 // Find out the ingress, transit and egress device of affected group
585 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
586 .stream().findAny().orElse(null);
587 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
588 .stream().findAny().orElse(null);
589 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
590 ConnectPoint source = getSource(mcastIp);
Pier Luigi24592652018-01-16 10:47:50 +0100591
Pier Luigi3dfd8352018-01-25 16:16:02 +0100592 // Do not proceed if ingress device or source of this group are missing
593 // If sinks are in other leafs, we have ingress, transit, egress, and source
594 // If sinks are in the same leaf, we have just ingress and source
595 if (ingressDevice == null || source == null) {
596 log.warn("Missing ingress {} or source {} for group {}",
597 ingressDevice, source, mcastIp);
Pier Luigi24592652018-01-16 10:47:50 +0100598 return;
599 }
Pier Luigi24592652018-01-16 10:47:50 +0100600
Pier Luigi1c34a442018-02-02 16:19:11 +0100601 // Verify leadership on the operation
602 if (!isLeader(source)) {
603 log.debug("Skip {} due to lack of leadership", mcastIp);
604 return;
Pier Luigi24592652018-01-16 10:47:50 +0100605 }
Pier Luigi3dfd8352018-01-25 16:16:02 +0100606
607 // If it exists, we have to remove it in any case
608 if (transitDevice != null) {
609 // Remove entire transit
610 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
611 }
612 // If the ingress is down
613 if (ingressDevice.equals(deviceDown)) {
614 // Remove entire ingress
615 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
616 // If other sinks different from the ingress exist
617 if (!egressDevices.isEmpty()) {
618 // Remove all the remaining egress
619 egressDevices.forEach(
620 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
621 );
Pier Luigi24592652018-01-16 10:47:50 +0100622 }
Pier Luigi3dfd8352018-01-25 16:16:02 +0100623 } else {
624 // Egress or transit could be down at this point
625 // Get the ingress-transit port if it exists
626 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
627 if (ingressTransitPort != null) {
628 // Remove transit-facing port on ingress device
629 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
630 }
631 // One of the egress device is down
632 if (egressDevices.contains(deviceDown)) {
633 // Remove entire device down
634 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
635 // Remove the device down from egress
636 egressDevices.remove(deviceDown);
637 // If there are no more egress and ingress does not have sinks
638 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
639 // Remove entire ingress
640 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
641 // We have done
642 return;
643 }
644 }
645 // Construct a new path for each egress device
646 egressDevices.forEach(egressDevice -> {
647 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
648 // If there is a new path
649 if (mcastPath.isPresent()) {
650 // Let's install the new mcast path for this egress
651 installPath(mcastIp, source, mcastPath.get());
652 } else {
653 // We were not able to find an alternative path for this egress
654 log.warn("Fail to recover egress device {} from device down {}",
655 egressDevice, deviceDown);
656 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
657 }
658 });
659 }
660 });
661 } finally {
662 mcastUnlock();
663 }
Pier Luigi24592652018-01-16 10:47:50 +0100664 }
665
666 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700667 * Adds filtering objective for given device and port.
668 *
669 * @param deviceId device ID
670 * @param port ingress port number
671 * @param assignedVlan assigned VLAN ID
672 */
Julia Ferguson65428c32017-08-10 18:15:24 +0000673 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
Charles Chand55e84d2016-03-30 17:54:24 -0700674 // Do nothing if the port is configured as suppressed
Charles Chan6ea94fc2016-05-10 17:29:47 -0700675 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
676 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi004d7cf2018-02-28 12:10:50 +0100677 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Charles Chan6ea94fc2016-05-10 17:29:47 -0700678 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
679 log.info("Ignore suppressed port {}", connectPoint);
Charles Chand55e84d2016-03-30 17:54:24 -0700680 return;
681 }
682
Charles Chan93090352018-03-02 13:26:22 -0800683 MacAddress routerMac;
684 try {
685 routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
686 } catch (DeviceConfigNotFoundException dcnfe) {
687 log.warn("Fail to push filtering objective since device is not configured. Abort");
688 return;
689 }
690
Charles Chand55e84d2016-03-30 17:54:24 -0700691 FilteringObjective.Builder filtObjBuilder =
Charles Chan93090352018-03-02 13:26:22 -0800692 filterObjBuilder(deviceId, port, assignedVlan, mcastIp, routerMac);
Charles Chan2199c302016-04-23 17:36:10 -0700693 ObjectiveContext context = new DefaultObjectiveContext(
694 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan59cc16d2017-02-02 16:20:42 -0800695 deviceId, port.toLong(), assignedVlan),
Charles Chan2199c302016-04-23 17:36:10 -0700696 (objective, error) ->
697 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan59cc16d2017-02-02 16:20:42 -0800698 deviceId, port.toLong(), assignedVlan, error));
Charles Chan2199c302016-04-23 17:36:10 -0700699 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chand55e84d2016-03-30 17:54:24 -0700700 }
701
702 /**
703 * Adds a port to given multicast group on given device. This involves the
704 * update of L3 multicast group and multicast routing table entry.
705 *
706 * @param deviceId device ID
707 * @param port port to be added
708 * @param mcastIp multicast group
709 * @param assignedVlan assigned VLAN ID
710 */
711 private void addPortToDevice(DeviceId deviceId, PortNumber port,
712 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -0700713 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -0700714 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi7b7a29d2018-01-19 10:24:53 +0100715 NextObjective newNextObj;
Charles Chan2199c302016-04-23 17:36:10 -0700716 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -0700717 // First time someone request this mcast group via this device
718 portBuilder.add(port);
Pier Luigi7b7a29d2018-01-19 10:24:53 +0100719 // New nextObj
720 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
721 portBuilder.build(), null).add();
722 // Store the new port
723 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700724 } else {
725 // This device already serves some subscribers of this mcast group
Charles Chan2199c302016-04-23 17:36:10 -0700726 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -0700727 // Stop if the port is already in the nextobj
728 Set<PortNumber> existingPorts = getPorts(nextObj.next());
729 if (existingPorts.contains(port)) {
730 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
731 return;
732 }
Pier Luigi7b7a29d2018-01-19 10:24:53 +0100733 // Let's add the port and reuse the previous one
Yuta HIGUCHI7e57aaa2018-02-09 18:05:23 -0800734 portBuilder.addAll(existingPorts).add(port);
Pier Luigi7b7a29d2018-01-19 10:24:53 +0100735 // Reuse previous nextObj
736 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
737 portBuilder.build(), nextObj.id()).addToExisting();
738 // Store the final next objective and send only the difference to the driver
739 mcastNextObjStore.put(mcastStoreKey, newNextObj);
740 // Add just the new port
741 portBuilder = ImmutableSet.builder();
742 portBuilder.add(port);
743 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
744 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chand55e84d2016-03-30 17:54:24 -0700745 }
746 // Create, store and apply the new nextObj and fwdObj
Charles Chan2199c302016-04-23 17:36:10 -0700747 ObjectiveContext context = new DefaultObjectiveContext(
748 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
749 mcastIp, deviceId, port.toLong(), assignedVlan),
750 (objective, error) ->
751 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
752 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chand55e84d2016-03-30 17:54:24 -0700753 ForwardingObjective fwdObj =
Charles Chan2199c302016-04-23 17:36:10 -0700754 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chand55e84d2016-03-30 17:54:24 -0700755 srManager.flowObjectiveService.next(deviceId, newNextObj);
756 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700757 }
758
759 /**
760 * Removes a port from given multicast group on given device.
761 * This involves the update of L3 multicast group and multicast routing
762 * table entry.
763 *
764 * @param deviceId device ID
765 * @param port port to be added
766 * @param mcastIp multicast group
767 * @param assignedVlan assigned VLAN ID
768 * @return true if this is the last sink on this device
769 */
770 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
771 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -0700772 McastStoreKey mcastStoreKey =
773 new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -0700774 // This device is not serving this multicast group
Charles Chan2199c302016-04-23 17:36:10 -0700775 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -0700776 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
777 return false;
778 }
Charles Chan2199c302016-04-23 17:36:10 -0700779 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -0700780
781 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan2199c302016-04-23 17:36:10 -0700782 // This port does not serve this multicast group
Charles Chand55e84d2016-03-30 17:54:24 -0700783 if (!existingPorts.contains(port)) {
784 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
785 return false;
786 }
787 // Copy and modify the ImmutableSet
788 existingPorts = Sets.newHashSet(existingPorts);
789 existingPorts.remove(port);
790
791 NextObjective newNextObj;
Pier Luigib525fe92018-01-19 10:24:53 +0100792 ObjectiveContext context;
Charles Chand55e84d2016-03-30 17:54:24 -0700793 ForwardingObjective fwdObj;
794 if (existingPorts.isEmpty()) {
Pier Luigib525fe92018-01-19 10:24:53 +0100795 // If this is the last sink, remove flows and last bucket
Charles Chand55e84d2016-03-30 17:54:24 -0700796 // NOTE: Rely on GroupStore garbage collection rather than explicitly
797 // remove L3MG since there might be other flows/groups refer to
798 // the same L2IG
Pier Luigib525fe92018-01-19 10:24:53 +0100799 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -0700800 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
801 mcastIp, deviceId, port.toLong(), assignedVlan),
802 (objective, error) ->
803 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
804 mcastIp, deviceId, port.toLong(), assignedVlan, error));
805 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
806 mcastNextObjStore.remove(mcastStoreKey);
Charles Chand55e84d2016-03-30 17:54:24 -0700807 } else {
808 // If this is not the last sink, update flows and groups
Pier Luigib525fe92018-01-19 10:24:53 +0100809 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -0700810 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
811 mcastIp, deviceId, port.toLong(), assignedVlan),
812 (objective, error) ->
813 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
814 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigib525fe92018-01-19 10:24:53 +0100815 // Here we store the next objective with the remaining port
816 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
817 existingPorts, nextObj.id()).removeFromExisting();
Charles Chanfc5c7802016-05-17 13:13:55 -0700818 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan2199c302016-04-23 17:36:10 -0700819 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700820 }
Pier Luigib525fe92018-01-19 10:24:53 +0100821 // Let's modify the next objective removing the bucket
822 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
823 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
824 srManager.flowObjectiveService.next(deviceId, newNextObj);
825 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700826 return existingPorts.isEmpty();
827 }
828
Charles Chan2199c302016-04-23 17:36:10 -0700829 /**
830 * Removes entire group on given device.
831 *
832 * @param deviceId device ID
833 * @param mcastIp multicast group to be removed
834 * @param assignedVlan assigned VLAN ID
835 */
836 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
837 VlanId assignedVlan) {
838 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
839 // This device is not serving this multicast group
840 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
841 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
842 return;
843 }
844 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
845 // NOTE: Rely on GroupStore garbage collection rather than explicitly
846 // remove L3MG since there might be other flows/groups refer to
847 // the same L2IG
848 ObjectiveContext context = new DefaultObjectiveContext(
849 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
850 mcastIp, deviceId, assignedVlan),
851 (objective, error) ->
852 log.warn("Failed to remove {} on {}, vlan {}: {}",
853 mcastIp, deviceId, assignedVlan, error));
854 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
855 srManager.flowObjectiveService.forward(deviceId, fwdObj);
856 mcastNextObjStore.remove(mcastStoreKey);
857 mcastRoleStore.remove(mcastStoreKey);
858 }
859
Pier Luigi24592652018-01-16 10:47:50 +0100860 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
861 // Get Links
862 List<Link> links = mcastPath.links();
863 // For each link, modify the next on the source device adding the src port
864 // and a new filter objective on the destination port
865 links.forEach(link -> {
866 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
867 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
868 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
869 mcastIp);
870 });
871 // Setup new transit mcast role
872 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
873 McastRole.TRANSIT);
Charles Chan2199c302016-04-23 17:36:10 -0700874 }
875
Charles Chand55e84d2016-03-30 17:54:24 -0700876 /**
877 * Creates a next objective builder for multicast.
878 *
879 * @param mcastIp multicast group
880 * @param assignedVlan assigned VLAN ID
881 * @param outPorts set of output port numbers
882 * @return next objective builder
883 */
884 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi7b7a29d2018-01-19 10:24:53 +0100885 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
886 // If nextId is null allocate a new one
887 if (nextId == null) {
888 nextId = srManager.flowObjectiveService.allocateNextId();
889 }
Charles Chand55e84d2016-03-30 17:54:24 -0700890
891 TrafficSelector metadata =
892 DefaultTrafficSelector.builder()
893 .matchVlanId(assignedVlan)
894 .matchIPDst(mcastIp.toIpPrefix())
895 .build();
896
897 NextObjective.Builder nextObjBuilder = DefaultNextObjective
898 .builder().withId(nextId)
Pier Luigi004d7cf2018-02-28 12:10:50 +0100899 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId())
Charles Chand55e84d2016-03-30 17:54:24 -0700900 .withMeta(metadata);
901
902 outPorts.forEach(port -> {
903 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
904 if (egressVlan().equals(VlanId.NONE)) {
905 tBuilder.popVlan();
906 }
907 tBuilder.setOutput(port);
908 nextObjBuilder.addTreatment(tBuilder.build());
909 });
910
911 return nextObjBuilder;
912 }
913
914 /**
915 * Creates a forwarding objective builder for multicast.
916 *
917 * @param mcastIp multicast group
918 * @param assignedVlan assigned VLAN ID
919 * @param nextId next ID of the L3 multicast group
920 * @return forwarding objective builder
921 */
922 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
923 VlanId assignedVlan, int nextId) {
924 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Ferguson65428c32017-08-10 18:15:24 +0000925 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
926
927 if (mcastIp.isIp4()) {
928 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
929 sbuilder.matchIPDst(mcastPrefix);
930 } else {
931 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
932 sbuilder.matchIPv6Dst(mcastPrefix);
933 }
934
935
Charles Chand55e84d2016-03-30 17:54:24 -0700936 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
937 metabuilder.matchVlanId(assignedVlan);
938
939 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
940 fwdBuilder.withSelector(sbuilder.build())
941 .withMeta(metabuilder.build())
942 .nextStep(nextId)
943 .withFlag(ForwardingObjective.Flag.SPECIFIC)
Pier Luigi004d7cf2018-02-28 12:10:50 +0100944 .fromApp(srManager.appId())
Charles Chand55e84d2016-03-30 17:54:24 -0700945 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
946 return fwdBuilder;
947 }
948
949 /**
950 * Creates a filtering objective builder for multicast.
951 *
952 * @param deviceId Device ID
953 * @param ingressPort ingress port of the multicast stream
954 * @param assignedVlan assigned VLAN ID
Charles Chan93090352018-03-02 13:26:22 -0800955 * @param routerMac router MAC. This is carried in metadata and used from some switches that
956 * need to put unicast entry before multicast entry in TMAC table.
Charles Chand55e84d2016-03-30 17:54:24 -0700957 * @return filtering objective builder
958 */
959 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
Charles Chan93090352018-03-02 13:26:22 -0800960 VlanId assignedVlan, IpAddress mcastIp, MacAddress routerMac) {
Charles Chand55e84d2016-03-30 17:54:24 -0700961 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Charles Chan1588e7b2016-06-28 16:50:13 -0700962
Julia Ferguson65428c32017-08-10 18:15:24 +0000963 if (mcastIp.isIp4()) {
964 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
965 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
966 MacAddress.IPV4_MULTICAST_MASK))
967 .addCondition(Criteria.matchVlanId(egressVlan()))
968 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
969 } else {
970 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
971 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
972 MacAddress.IPV6_MULTICAST_MASK))
973 .addCondition(Criteria.matchVlanId(egressVlan()))
974 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
975 }
Charles Chan1588e7b2016-06-28 16:50:13 -0700976 TrafficTreatment tt = DefaultTrafficTreatment.builder()
Charles Chan93090352018-03-02 13:26:22 -0800977 .pushVlan().setVlanId(assignedVlan)
978 .setEthDst(routerMac)
979 .build();
Charles Chan1588e7b2016-06-28 16:50:13 -0700980 filtBuilder.withMeta(tt);
981
Pier Luigi004d7cf2018-02-28 12:10:50 +0100982 return filtBuilder.permit().fromApp(srManager.appId());
Charles Chand55e84d2016-03-30 17:54:24 -0700983 }
984
985 /**
986 * Gets output ports information from treatments.
987 *
988 * @param treatments collection of traffic treatments
989 * @return set of output port numbers
990 */
991 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
992 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
993 treatments.forEach(treatment -> {
994 treatment.allInstructions().stream()
995 .filter(instr -> instr instanceof OutputInstruction)
996 .forEach(instr -> {
997 builder.add(((OutputInstruction) instr).port());
998 });
999 });
1000 return builder.build();
1001 }
1002
Pier Luigiba5a7c32018-02-23 19:57:40 +01001003 // Utility method to verify is a link is a pair-link
1004 private boolean isPairLink(Link link) {
1005 // Take src id, src port, dst id and dst port
1006 final DeviceId srcId = link.src().deviceId();
1007 final PortNumber srcPort = link.src().port();
1008 final DeviceId dstId = link.dst().deviceId();
1009 final PortNumber dstPort = link.dst().port();
1010 // init as true
1011 boolean isPairLink = true;
1012 try {
1013 // If one of this condition is not true; it is not a pair link
Pier Luigi004d7cf2018-02-28 12:10:50 +01001014 if (!(srManager.deviceConfiguration().isEdgeDevice(srcId) &&
1015 srManager.deviceConfiguration().isEdgeDevice(dstId) &&
1016 srManager.deviceConfiguration().getPairDeviceId(srcId).equals(dstId) &&
1017 srManager.deviceConfiguration().getPairLocalPort(srcId).equals(srcPort) &&
1018 srManager.deviceConfiguration().getPairLocalPort(dstId).equals(dstPort))) {
Pier Luigiba5a7c32018-02-23 19:57:40 +01001019 isPairLink = false;
1020 }
1021 } catch (DeviceConfigNotFoundException e) {
1022 // Configuration not provided
1023 log.warn("Could not check if the link {} is pairlink "
1024 + "config not yet provided", link);
1025 isPairLink = false;
1026 }
1027 return isPairLink;
1028 }
1029
Charles Chand55e84d2016-03-30 17:54:24 -07001030 /**
1031 * Gets a path from src to dst.
1032 * If a path was allocated before, returns the allocated path.
1033 * Otherwise, randomly pick one from available paths.
1034 *
1035 * @param src source device ID
1036 * @param dst destination device ID
1037 * @param mcastIp multicast group
1038 * @return an optional path from src to dst
1039 */
1040 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
Pier Luigi80cb7482018-02-15 16:33:08 +01001041 // Takes a snapshot of the topology
1042 final Topology currentTopology = topologyService.currentTopology();
Charles Chand55e84d2016-03-30 17:54:24 -07001043 List<Path> allPaths = Lists.newArrayList(
Pier Luigi80cb7482018-02-15 16:33:08 +01001044 topologyService.getPaths(currentTopology, src, dst)
1045 );
Pier Luigiba5a7c32018-02-23 19:57:40 +01001046 // Create list of valid paths
1047 allPaths.removeIf(path -> path.links().stream().anyMatch(this::isPairLink));
1048 // If there are no valid paths, just exit
Charles Chan2199c302016-04-23 17:36:10 -07001049 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chand55e84d2016-03-30 17:54:24 -07001050 if (allPaths.isEmpty()) {
Charles Chand55e84d2016-03-30 17:54:24 -07001051 return Optional.empty();
1052 }
1053
Pier Luigi5131dad2018-01-23 16:06:38 +01001054 // Create a map index of suitablity-to-list of paths. For example
1055 // a path in the list associated to the index 1 shares only the
1056 // first hop and it is less suitable of a path belonging to the index
1057 // 2 that shares leaf-spine.
1058 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1059 // Some init steps
1060 int nhop;
1061 McastStoreKey mcastStoreKey;
1062 Link hop;
1063 PortNumber srcPort;
1064 Set<PortNumber> existingPorts;
1065 NextObjective nextObj;
1066 // Iterate over paths looking for eligible paths
1067 for (Path path : allPaths) {
1068 // Unlikely, it will happen...
1069 if (!src.equals(path.links().get(0).src().deviceId())) {
1070 continue;
1071 }
1072 nhop = 0;
1073 // Iterate over the links
1074 while (nhop < path.links().size()) {
1075 // Get the link and verify if a next related
1076 // to the src device exist in the store
1077 hop = path.links().get(nhop);
1078 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1079 // It does not exist in the store, exit
1080 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1081 break;
Charles Chand55e84d2016-03-30 17:54:24 -07001082 }
Pier Luigi5131dad2018-01-23 16:06:38 +01001083 // Get the output ports on the next
1084 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1085 existingPorts = getPorts(nextObj.next());
1086 // And the src port on the link
1087 srcPort = hop.src().port();
1088 // the src port is not used as output, exit
1089 if (!existingPorts.contains(srcPort)) {
1090 break;
1091 }
1092 nhop++;
1093 }
1094 // n_hop defines the index
1095 if (nhop > 0) {
1096 eligiblePaths.compute(nhop, (index, paths) -> {
1097 paths = paths == null ? Lists.newArrayList() : paths;
1098 paths.add(path);
1099 return paths;
1100 });
Charles Chand55e84d2016-03-30 17:54:24 -07001101 }
1102 }
Pier Luigi5131dad2018-01-23 16:06:38 +01001103
1104 // No suitable paths
1105 if (eligiblePaths.isEmpty()) {
1106 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1107 // Otherwise, randomly pick a path
1108 Collections.shuffle(allPaths);
1109 return allPaths.stream().findFirst();
1110 }
1111
1112 // Let's take the best ones
1113 Integer bestIndex = eligiblePaths.keySet()
1114 .stream()
1115 .sorted(Comparator.reverseOrder())
1116 .findFirst().orElse(null);
1117 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1118 log.debug("{} eligiblePath(s) found from {} to {}",
1119 bestPaths.size(), src, dst);
1120 // randomly pick a path on the highest index
1121 Collections.shuffle(bestPaths);
1122 return bestPaths.stream().findFirst();
Charles Chand55e84d2016-03-30 17:54:24 -07001123 }
1124
1125 /**
Charles Chan2199c302016-04-23 17:36:10 -07001126 * Gets device(s) of given role in given multicast group.
1127 *
1128 * @param mcastIp multicast IP
1129 * @param role multicast role
1130 * @return set of device ID or empty set if not found
1131 */
1132 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1133 return mcastRoleStore.entrySet().stream()
1134 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1135 entry.getValue().value() == role)
1136 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
1137 .collect(Collectors.toSet());
1138 }
1139
1140 /**
Charles Chan8d449862016-05-16 18:44:13 -07001141 * Gets source connect point of given multicast group.
1142 *
1143 * @param mcastIp multicast IP
1144 * @return source connect point or null if not found
1145 */
1146 private ConnectPoint getSource(IpAddress mcastIp) {
1147 return srManager.multicastRouteService.getRoutes().stream()
1148 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
1149 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
1150 .findAny().orElse(null);
1151 }
1152
1153 /**
Charles Chan2199c302016-04-23 17:36:10 -07001154 * Gets groups which is affected by the link down event.
1155 *
1156 * @param link link going down
1157 * @return a set of multicast IpAddress
1158 */
1159 private Set<IpAddress> getAffectedGroups(Link link) {
1160 DeviceId deviceId = link.src().deviceId();
1161 PortNumber port = link.src().port();
1162 return mcastNextObjStore.entrySet().stream()
1163 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1164 getPorts(entry.getValue().value().next()).contains(port))
1165 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1166 .collect(Collectors.toSet());
1167 }
1168
1169 /**
Pier Luigi24592652018-01-16 10:47:50 +01001170 * Gets groups which are affected by the device down event.
1171 *
1172 * @param deviceId device going down
1173 * @return a set of multicast IpAddress
1174 */
1175 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1176 return mcastNextObjStore.entrySet().stream()
1177 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
1178 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1179 .collect(Collectors.toSet());
1180 }
1181
1182 /**
Charles Chand55e84d2016-03-30 17:54:24 -07001183 * Gets egress VLAN from McastConfig.
1184 *
1185 * @return egress VLAN or VlanId.NONE if not configured
1186 */
1187 private VlanId egressVlan() {
1188 McastConfig mcastConfig =
1189 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1190 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1191 }
1192
1193 /**
1194 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chan8d449862016-05-16 18:44:13 -07001195 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chand55e84d2016-03-30 17:54:24 -07001196 *
Charles Chan8d449862016-05-16 18:44:13 -07001197 * @param cp connect point; Can be null if not specified
1198 * @return assigned VLAN ID
Charles Chand55e84d2016-03-30 17:54:24 -07001199 */
Charles Chan8d449862016-05-16 18:44:13 -07001200 private VlanId assignedVlan(ConnectPoint cp) {
1201 // Use the egressVlan if it is tagged
1202 if (!egressVlan().equals(VlanId.NONE)) {
1203 return egressVlan();
1204 }
1205 // Reuse unicast VLAN if the port has subnet configured
1206 if (cp != null) {
Charles Chanb4879a52017-10-20 19:09:16 -07001207 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan59cc16d2017-02-02 16:20:42 -08001208 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chan8d449862016-05-16 18:44:13 -07001209 }
Charles Chan59cc16d2017-02-02 16:20:42 -08001210 // Use DEFAULT_VLAN if none of the above matches
1211 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chand55e84d2016-03-30 17:54:24 -07001212 }
Charles Chan2199c302016-04-23 17:36:10 -07001213
1214 /**
1215 * Gets the spine-facing port on ingress device of given multicast group.
1216 *
1217 * @param mcastIp multicast IP
1218 * @return spine-facing port on ingress device
1219 */
1220 private PortNumber ingressTransitPort(IpAddress mcastIp) {
1221 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1222 .stream().findAny().orElse(null);
1223 if (ingressDevice != null) {
1224 NextObjective nextObj = mcastNextObjStore
1225 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1226 Set<PortNumber> ports = getPorts(nextObj.next());
1227
1228 for (PortNumber port : ports) {
1229 // Spine-facing port should have no subnet and no xconnect
Pier Luigi004d7cf2018-02-28 12:10:50 +01001230 if (srManager.deviceConfiguration() != null &&
1231 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chanfc5c7802016-05-17 13:13:55 -07001232 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Charles Chan2199c302016-04-23 17:36:10 -07001233 return port;
1234 }
1235 }
1236 }
1237 return null;
1238 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001239
1240 /**
Pier Luigi24592652018-01-16 10:47:50 +01001241 * Verify if the given device has sinks
1242 * for the multicast group.
1243 *
1244 * @param deviceId device Id
1245 * @param mcastIp multicast IP
1246 * @return true if the device has sink for the group.
1247 * False otherwise.
1248 */
1249 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1250 if (deviceId != null) {
1251 // Get the nextobjective
1252 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1253 new McastStoreKey(mcastIp, deviceId)
1254 );
1255 // If it exists
1256 if (versionedNextObj != null) {
1257 NextObjective nextObj = versionedNextObj.value();
1258 // Retrieves all the output ports
1259 Set<PortNumber> ports = getPorts(nextObj.next());
1260 // Tries to find at least one port that is not spine-facing
1261 for (PortNumber port : ports) {
1262 // Spine-facing port should have no subnet and no xconnect
Pier Luigi004d7cf2018-02-28 12:10:50 +01001263 if (srManager.deviceConfiguration() != null &&
1264 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi24592652018-01-16 10:47:50 +01001265 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1266 return true;
1267 }
1268 }
1269 }
1270 }
1271 return false;
1272 }
1273
1274 /**
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001275 * Removes filtering objective for given device and port.
1276 *
1277 * @param deviceId device ID
1278 * @param port ingress port number
1279 * @param assignedVlan assigned VLAN ID
1280 * @param mcastIp multicast IP address
1281 */
1282 private void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
1283 // Do nothing if the port is configured as suppressed
1284 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1285 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi004d7cf2018-02-28 12:10:50 +01001286 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001287 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1288 log.info("Ignore suppressed port {}", connectPoint);
1289 return;
1290 }
1291
Charles Chan93090352018-03-02 13:26:22 -08001292 MacAddress routerMac;
1293 try {
1294 routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
1295 } catch (DeviceConfigNotFoundException dcnfe) {
1296 log.warn("Fail to push filtering objective since device is not configured. Abort");
1297 return;
1298 }
1299
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001300 FilteringObjective.Builder filtObjBuilder =
Charles Chan93090352018-03-02 13:26:22 -08001301 filterObjBuilder(deviceId, port, assignedVlan, mcastIp, routerMac);
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001302 ObjectiveContext context = new DefaultObjectiveContext(
1303 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1304 deviceId, port.toLong(), assignedVlan),
1305 (objective, error) ->
1306 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1307 deviceId, port.toLong(), assignedVlan, error));
1308 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1309 }
1310
1311 /**
Pier Luigi3dfd8352018-01-25 16:16:02 +01001312 * Updates filtering objective for given device and port.
1313 * It is called in general when the mcast config has been
1314 * changed.
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001315 *
1316 * @param deviceId device ID
1317 * @param portNum ingress port number
1318 * @param vlanId assigned VLAN ID
1319 * @param install true to add, false to remove
1320 */
Pier Luigi004d7cf2018-02-28 12:10:50 +01001321 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001322 VlanId vlanId, boolean install) {
Pier Luigi3dfd8352018-01-25 16:16:02 +01001323 lastMcastChange = Instant.now();
1324 mcastLock();
1325 try {
1326 // Iterates over the route and updates properly the filtering objective
1327 // on the source device.
1328 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1329 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
1330 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1331 if (install) {
1332 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1333 } else {
1334 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1335 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001336 }
Pier Luigi3dfd8352018-01-25 16:16:02 +01001337 });
1338 } finally {
1339 mcastUnlock();
1340 }
1341 }
1342
Pier Luigi1c34a442018-02-02 16:19:11 +01001343 private boolean isLeader(ConnectPoint source) {
1344 // Continue only when we have the mastership on the operation
1345 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1346 // When the source is available we just check the mastership
1347 if (srManager.deviceService.isAvailable(source.deviceId())) {
1348 return false;
1349 }
1350 // Fallback with Leadership service
1351 // source id is used a topic
1352 NodeId leader = srManager.leadershipService.runForLeadership(
1353 source.deviceId().toString()).leaderNodeId();
1354 // Verify if this node is the leader
1355 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
1356 return false;
1357 }
1358 }
1359 // Done
1360 return true;
1361 }
1362
Pier Luigi3dfd8352018-01-25 16:16:02 +01001363 /**
1364 * Performs bucket verification operation for all mcast groups in the devices.
1365 * Firstly, it verifies that mcast is stable before trying verification operation.
1366 * Verification consists in creating new nexts with VERIFY operation. Actually,
1367 * the operation is totally delegated to the driver.
1368 */
1369 private final class McastBucketCorrector implements Runnable {
1370
1371 @Override
1372 public void run() {
1373 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1374 if (!isMcastStable()) {
1375 return;
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001376 }
Pier Luigi3dfd8352018-01-25 16:16:02 +01001377 // Acquires lock
1378 mcastLock();
1379 try {
1380 // Iterates over the routes and verify the related next objectives
1381 srManager.multicastRouteService.getRoutes()
1382 .stream()
1383 .map(McastRoute::group)
1384 .forEach(mcastIp -> {
1385 log.trace("Running mcast buckets corrector for mcast group: {}",
1386 mcastIp);
1387
1388 // For each group we get current information in the store
1389 // and issue a check of the next objectives in place
1390 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1391 .stream().findAny().orElse(null);
1392 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
1393 .stream().findAny().orElse(null);
1394 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
1395 ConnectPoint source = getSource(mcastIp);
1396
1397 // Do not proceed if ingress device or source of this group are missing
1398 if (ingressDevice == null || source == null) {
1399 log.warn("Unable to run buckets corrector. " +
1400 "Missing ingress {} or source {} for group {}",
1401 ingressDevice, source, mcastIp);
1402 return;
1403 }
1404
1405 // Continue only when this instance is the master of source device
1406 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1407 log.trace("Unable to run buckets corrector. " +
1408 "Skip {} due to lack of mastership " +
1409 "of the source device {}",
1410 mcastIp, source.deviceId());
1411 return;
1412 }
1413
1414 // Create the set of the devices to be processed
1415 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1416 devicesBuilder.add(ingressDevice);
1417 if (transitDevice != null) {
1418 devicesBuilder.add(transitDevice);
1419 }
1420 if (!egressDevices.isEmpty()) {
1421 devicesBuilder.addAll(egressDevices);
1422 }
1423 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1424
1425 // Iterate over the devices
1426 devicesToProcess.forEach(deviceId -> {
1427 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1428 // If next exists in our store verify related next objective
1429 if (mcastNextObjStore.containsKey(currentKey)) {
1430 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1431 // Get current ports
1432 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1433 // Rebuild the next objective
1434 currentNext = nextObjBuilder(
1435 mcastIp,
1436 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1437 currentPorts,
1438 currentNext.id()
1439 ).verify();
1440 // Send to the flowobjective service
1441 srManager.flowObjectiveService.next(deviceId, currentNext);
1442 } else {
Pier Luigi80cb7482018-02-15 16:33:08 +01001443 log.warn("Unable to run buckets corrector. " +
Pier Luigi3dfd8352018-01-25 16:16:02 +01001444 "Missing next for {} and group {}",
1445 deviceId, mcastIp);
1446 }
1447 });
1448
1449 });
1450 } finally {
1451 // Finally, it releases the lock
1452 mcastUnlock();
1453 }
1454
1455 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001456 }
Pier Luigi0b14d6c2018-01-15 18:06:43 +01001457
1458 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1459 // If mcast ip is present
1460 if (mcastIp != null) {
1461 return mcastNextObjStore.entrySet().stream()
1462 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1463 .collect(Collectors.toMap(Map.Entry::getKey,
1464 entry -> entry.getValue().value().id()));
1465 }
1466 // Otherwise take all the groups
1467 return mcastNextObjStore.entrySet().stream()
1468 .collect(Collectors.toMap(Map.Entry::getKey,
1469 entry -> entry.getValue().value().id()));
1470 }
1471
Pier Luigi004d7cf2018-02-28 12:10:50 +01001472 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0b14d6c2018-01-15 18:06:43 +01001473 // If mcast ip is present
1474 if (mcastIp != null) {
1475 return mcastRoleStore.entrySet().stream()
1476 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1477 .collect(Collectors.toMap(Map.Entry::getKey,
1478 entry -> entry.getValue().value()));
1479 }
1480 // Otherwise take all the groups
1481 return mcastRoleStore.entrySet().stream()
1482 .collect(Collectors.toMap(Map.Entry::getKey,
1483 entry -> entry.getValue().value()));
1484 }
1485
1486 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1487 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1488 // Get the source
1489 ConnectPoint source = getSource(mcastIp);
1490 // Source cannot be null, we don't know the starting point
1491 if (source != null) {
1492 // Init steps
1493 Set<DeviceId> visited = Sets.newHashSet();
1494 List<ConnectPoint> currentPath = Lists.newArrayList(
1495 source
1496 );
1497 // Build recursively the mcast paths
1498 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1499 }
1500 return mcastPaths;
1501 }
1502
1503 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1504 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1505 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1506 // If we have visited the node to visit
1507 // there is a loop
1508 if (visited.contains(toVisit)) {
1509 return;
1510 }
1511 // Visit next-hop
1512 visited.add(toVisit);
1513 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1514 // Looking for next-hops
1515 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1516 // Build egress connectpoints
1517 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1518 // Get Ports
1519 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
1520 // Build relative cps
1521 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1522 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1523 Set<ConnectPoint> egressPoints = cpBuilder.build();
1524 // Define other variables for the next steps
1525 Set<Link> egressLinks;
1526 List<ConnectPoint> newCurrentPath;
1527 Set<DeviceId> newVisited;
1528 DeviceId newToVisit;
1529 for (ConnectPoint egressPoint : egressPoints) {
1530 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1531 // If it does not have egress links, stop
1532 if (egressLinks.isEmpty()) {
1533 // Add the connect points to the path
1534 newCurrentPath = Lists.newArrayList(currentPath);
1535 newCurrentPath.add(0, egressPoint);
1536 // Save in the map
1537 mcastPaths.put(egressPoint, newCurrentPath);
1538 } else {
1539 newVisited = Sets.newHashSet(visited);
1540 // Iterate over the egress links for the next hops
1541 for (Link egressLink : egressLinks) {
1542 // Update to visit
1543 newToVisit = egressLink.dst().deviceId();
1544 // Add the connect points to the path
1545 newCurrentPath = Lists.newArrayList(currentPath);
1546 newCurrentPath.add(0, egressPoint);
1547 newCurrentPath.add(0, egressLink.dst());
1548 // Go to the next hop
1549 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1550 }
1551 }
1552 }
1553 }
1554 }
1555
Charles Chand55e84d2016-03-30 17:54:24 -07001556}