blob: 968ad3b5b464ec435cc77de2299fac33509c2375 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Pier Luigid29ca7c2018-02-28 17:24:03 +010019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Charles Chanc91c8782016-03-30 17:54:24 -070023import com.google.common.collect.ImmutableSet;
24import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010025import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.Sets;
27import org.onlab.packet.Ethernet;
28import org.onlab.packet.IpAddress;
29import org.onlab.packet.IpPrefix;
30import org.onlab.packet.MacAddress;
31import org.onlab.packet.VlanId;
32import org.onlab.util.KryoNamespace;
Pier Luigi580fd8a2018-01-16 10:47:50 +010033import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070034import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070036import org.onosproject.mcast.api.McastEvent;
37import org.onosproject.mcast.api.McastRoute;
38import org.onosproject.mcast.api.McastRouteUpdate;
Ray Milkeyae0068a2017-08-15 11:02:29 -070039import org.onosproject.net.config.basics.McastConfig;
Charles Chanc91c8782016-03-30 17:54:24 -070040import org.onosproject.net.ConnectPoint;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.Link;
43import org.onosproject.net.Path;
44import org.onosproject.net.PortNumber;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.flow.criteria.Criteria;
Pier Luigid29ca7c2018-02-28 17:24:03 +010050import org.onosproject.net.flow.criteria.VlanIdCriterion;
Charles Chanc91c8782016-03-30 17:54:24 -070051import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
52import org.onosproject.net.flowobjective.DefaultFilteringObjective;
53import org.onosproject.net.flowobjective.DefaultForwardingObjective;
54import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan72779502016-04-23 17:36:10 -070055import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.net.flowobjective.FilteringObjective;
57import org.onosproject.net.flowobjective.ForwardingObjective;
58import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070059import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070060import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010061import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070062import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070063import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010064import org.onosproject.segmentrouting.SegmentRoutingManager;
65import org.onosproject.segmentrouting.SegmentRoutingService;
Pier Luigi51ee7c02018-02-23 19:57:40 +010066import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Charles Chan370a65b2016-05-10 17:29:47 -070067import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan72779502016-04-23 17:36:10 -070068import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070069import org.onosproject.store.serializers.KryoNamespaces;
70import org.onosproject.store.service.ConsistentMap;
71import org.onosproject.store.service.Serializer;
72import org.onosproject.store.service.StorageService;
Pier Luigi580fd8a2018-01-16 10:47:50 +010073import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070074import org.slf4j.Logger;
75import org.slf4j.LoggerFactory;
76
Pier Luigi35dab3f2018-01-25 16:16:02 +010077import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070078import java.util.Collection;
79import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010080import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070081import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070082import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070083import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070084import java.util.Optional;
85import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010086import java.util.concurrent.ScheduledExecutorService;
87import java.util.concurrent.TimeUnit;
88import java.util.concurrent.locks.Lock;
89import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070090import java.util.stream.Collectors;
91
Pier Luigi35dab3f2018-01-25 16:16:02 +010092import static java.util.concurrent.Executors.newScheduledThreadPool;
93import static org.onlab.util.Tools.groupedThreads;
Pier1f87aca2018-03-14 16:47:32 -070094
95import static org.onosproject.mcast.api.McastEvent.Type.*;
Pier Luigid29ca7c2018-02-28 17:24:03 +010096import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
Charles Chan10b0fb72017-02-02 16:20:42 -080097import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Pier979e61a2018-03-07 11:42:50 +010098import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
99import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
100import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -0700101
102/**
Pier Luigi69f774d2018-02-28 12:10:50 +0100103 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -0700104 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700105public class McastHandler {
106 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700107 private final SegmentRoutingManager srManager;
108 private final ApplicationId coreAppId;
Charles Chan82f19972016-05-17 13:13:55 -0700109 private final StorageService storageService;
110 private final TopologyService topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700111 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
112 private final KryoNamespace.Builder mcastKryo;
113 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
114
Pier Luigid29ca7c2018-02-28 17:24:03 +0100115 // Wait time for the cache
116 private static final int WAIT_TIME_MS = 1000;
117 /**
118 * The mcastEventCache is implemented to avoid race condition by giving more time to the
119 * underlying subsystems to process previous calls.
120 */
121 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
122 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
123 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
124 // Get group ip, sink and related event
125 IpAddress mcastIp = notification.getKey().mcastIp();
126 ConnectPoint sink = notification.getKey().sink();
127 McastEvent mcastEvent = notification.getValue();
128 RemovalCause cause = notification.getCause();
129 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
130 mcastIp, sink, mcastEvent, cause);
131 // If it expires or it has been replaced, we deque the event
132 switch (notification.getCause()) {
133 case REPLACED:
134 case EXPIRED:
135 dequeueMcastEvent(mcastEvent);
136 break;
137 default:
138 break;
139 }
140 }).build();
141
142 private void enqueueMcastEvent(McastEvent mcastEvent) {
143 log.debug("Enqueue mcastEvent {}", mcastEvent);
Pier1f87aca2018-03-14 16:47:32 -0700144 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
145 final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
146 final IpAddress group = prevUpdate.route().group();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100147 // Let's create the keys of the cache
148 ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700149 Set<ConnectPoint> sinks;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100150 // For this event we will have a set of sinks
Pier1f87aca2018-03-14 16:47:32 -0700151 if (mcastEvent.type() == SOURCES_ADDED ||
152 mcastEvent.type() == SOURCES_REMOVED) {
153 // FIXME To be addressed with multiple sources support
154 sinks = mcastRouteUpdate.sinks()
155 .values()
156 .stream()
157 .flatMap(Collection::stream)
158 .collect(Collectors.toSet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100159 } else {
Pier1f87aca2018-03-14 16:47:32 -0700160 Set<ConnectPoint> prevSinks = prevUpdate.sinks()
161 .values()
162 .stream()
163 .flatMap(Collection::stream)
164 .collect(Collectors.toSet());
165 if (mcastEvent.type() == ROUTE_REMOVED) {
166 // Get the old sinks, since current subject is null
167 sinks = prevSinks;
168 } else {
169 // Get new sinks
170 Set<ConnectPoint> newsinks = mcastRouteUpdate.sinks()
171 .values()
172 .stream()
173 .flatMap(Collection::stream)
174 .collect(Collectors.toSet());
175 // If it is a SINKS_ADDED event
176 if (mcastEvent.type() == SINKS_ADDED) {
177 // Let's do the difference between current and prev subjects
178 sinks = Sets.difference(newsinks, prevSinks);
179 } else {
180 // Let's do the difference between prev and current subjects
181 sinks = Sets.difference(prevSinks, newsinks);
182 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100183 }
184 }
Pier1f87aca2018-03-14 16:47:32 -0700185 // Add all the sinks
186 sinksBuilder.addAll(sinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100187 // Push the elements in the cache
188 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700189 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100190 mcastEventCache.put(cacheKey, mcastEvent);
191 });
192 }
193
194 private void dequeueMcastEvent(McastEvent mcastEvent) {
195 log.debug("Dequeue mcastEvent {}", mcastEvent);
Pier1f87aca2018-03-14 16:47:32 -0700196 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
197 final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100198 // Get source, mcast group
Pier1f87aca2018-03-14 16:47:32 -0700199 // FIXME To be addressed with multiple sources support
200 ConnectPoint prevSource = prevUpdate.sources()
201 .stream()
202 .findFirst()
203 .orElse(null);
204 IpAddress mcastIp = prevUpdate.route().group();
205 Set<ConnectPoint> prevSinks = prevUpdate.sinks()
206 .values()
207 .stream()
208 .flatMap(Collection::stream)
209 .collect(Collectors.toSet());
210 Set<ConnectPoint> newSinks;
211 // Sinks to handled by SINKS_ADDED and SINKS_REMOVED procedures
212 Set<ConnectPoint> sinks;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100213 // According to the event type let's call the proper method
214 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700215 case SOURCES_ADDED:
216 // FIXME To be addressed with multiple sources support
217 // Get all the sinks
218 //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
219 // Compute the Mcast tree
220 //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
221 // Process the given sinks using the pre-computed paths
222 //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100223 break;
Pier1f87aca2018-03-14 16:47:32 -0700224 case SOURCES_REMOVED:
225 // FIXME To be addressed with multiple sources support
Pier Luigid29ca7c2018-02-28 17:24:03 +0100226 // Get old source
Pier1f87aca2018-03-14 16:47:32 -0700227 //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100228 // Just the first cached element will be processed
Pier1f87aca2018-03-14 16:47:32 -0700229 //processSourceUpdatedInternal(mcastIp, source, oldSource);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100230 break;
231 case ROUTE_REMOVED:
232 // Process the route removed, just the first cached element will be processed
Pier1f87aca2018-03-14 16:47:32 -0700233 processRouteRemovedInternal(prevSource, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100234 break;
Pier1f87aca2018-03-14 16:47:32 -0700235 case SINKS_ADDED:
236 // Get the only sinks to be processed (new ones)
237 newSinks = mcastRouteUpdate.sinks()
238 .values()
239 .stream()
240 .flatMap(Collection::stream)
241 .collect(Collectors.toSet());
242 sinks = Sets.difference(newSinks, prevSinks);
243 sinks.forEach(sink -> processSinkAddedInternal(prevSource, sink, mcastIp, null));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100244 break;
Pier1f87aca2018-03-14 16:47:32 -0700245 case SINKS_REMOVED:
246 // Get the only sinks to be processed (old ones)
247 newSinks = mcastRouteUpdate.sinks()
248 .values()
249 .stream()
250 .flatMap(Collection::stream)
251 .collect(Collectors.toSet());
252 sinks = Sets.difference(prevSinks, newSinks);
253 sinks.forEach(sink -> processSinkRemovedInternal(prevSource, sink, mcastIp));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100254 break;
255 default:
256 break;
257 }
258 }
259
Pier Luigi35dab3f2018-01-25 16:16:02 +0100260 // Mcast lock to serialize local operations
261 private final Lock mcastLock = new ReentrantLock();
262
263 /**
264 * Acquires the lock used when making mcast changes.
265 */
266 private void mcastLock() {
267 mcastLock.lock();
268 }
269
270 /**
271 * Releases the lock used when making mcast changes.
272 */
273 private void mcastUnlock() {
274 mcastLock.unlock();
275 }
276
277 // Stability threshold for Mcast. Seconds
278 private static final long MCAST_STABLITY_THRESHOLD = 5;
279 // Last change done
280 private Instant lastMcastChange = Instant.now();
281
282 /**
283 * Determines if mcast in the network has been stable in the last
284 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
285 * to the last mcast change timestamp.
286 *
287 * @return true if stable
288 */
289 private boolean isMcastStable() {
290 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
291 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800292 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100293 return (now - last) > MCAST_STABLITY_THRESHOLD;
294 }
295
296 // Verify interval for Mcast
297 private static final long MCAST_VERIFY_INTERVAL = 30;
298
299 // Executor for mcast bucket corrector
300 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100301 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100302
Charles Chan72779502016-04-23 17:36:10 -0700303 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700304 * Constructs the McastEventHandler.
305 *
306 * @param srManager Segment Routing manager
307 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700308 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -0700309 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700310 this.srManager = srManager;
311 this.storageService = srManager.storageService;
312 this.topologyService = srManager.topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700313 mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700314 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700315 .register(McastStoreKey.class)
316 .register(McastRole.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700317 mcastNextObjStore = storageService
Charles Chan72779502016-04-23 17:36:10 -0700318 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700319 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700320 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700321 .build();
Charles Chan72779502016-04-23 17:36:10 -0700322 mcastRoleStore = storageService
323 .<McastStoreKey, McastRole>consistentMapBuilder()
324 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700325 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700326 .build();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100327 // Init the executor service and the buckets corrector
328 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
329 MCAST_VERIFY_INTERVAL,
330 TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100331 // Schedule the clean up, this will allow the processing of the expired events
332 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
333 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700334 }
335
336 /**
337 * Read initial multicast from mcast store.
338 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100339 public void init() {
Charles Chan72779502016-04-23 17:36:10 -0700340 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier1f87aca2018-03-14 16:47:32 -0700341 // FIXME To be addressed with multiple sources support
342 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
343 .stream()
344 .findFirst()
345 .orElse(null);
346 Set<ConnectPoint> sinks = srManager.multicastRouteService.sinks(mcastRoute);
347 // Filter out all the working sinks, we do not want to move them
348 sinks = sinks.stream()
349 .filter(sink -> {
350 McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
351 Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
352 return verMcastNext == null ||
353 !getPorts(verMcastNext.value().next()).contains(sink.port());
354 })
355 .collect(Collectors.toSet());
356 // Compute the Mcast tree
357 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
358 // Process the given sinks using the pre-computed paths
359 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
360 mcastRoute.group(), paths));
Charles Chan72779502016-04-23 17:36:10 -0700361 });
Charles Chanc91c8782016-03-30 17:54:24 -0700362 }
363
364 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100365 * Clean up when deactivating the application.
366 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100367 public void terminate() {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100368 executorService.shutdown();
369 }
370
371 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100372 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
373 * SINK_REMOVED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700374 *
375 * @param event McastEvent with SOURCE_ADDED type
376 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100377 public void processMcastEvent(McastEvent event) {
378 log.info("process {}", event);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100379 // Just enqueue for now
380 enqueueMcastEvent(event);
Pier Luigi6786b922018-02-02 16:19:11 +0100381 }
382
383 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100384 * Process the SOURCE_UPDATED event.
385 *
386 * @param newSource the updated srouce info
387 * @param oldSource the outdated source info
388 */
389 private void processSourceUpdatedInternal(IpAddress mcastIp,
390 ConnectPoint newSource,
391 ConnectPoint oldSource) {
392 lastMcastChange = Instant.now();
393 mcastLock();
394 try {
395 log.debug("Processing source updated for group {}", mcastIp);
396
397 // Build key for the store and retrieve old data
398 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
399
400 // Verify leadership on the operation
401 if (!isLeader(oldSource)) {
402 log.debug("Skip {} due to lack of leadership", mcastIp);
403 return;
404 }
405
406 // This device is not serving this multicast group
407 if (!mcastRoleStore.containsKey(mcastStoreKey) ||
408 !mcastNextObjStore.containsKey(mcastStoreKey)) {
409 log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
410 return;
411 }
412 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
413 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
414
Pier Luigid29ca7c2018-02-28 17:24:03 +0100415 // This an optimization to avoid unnecessary removal and add
416 if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
417 // Let's remove old flows and groups
418 removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
419 // Push new flows and group
420 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
421 mcastIp, assignedVlan(newSource)));
422 }
Pier Luigie80d6b42018-02-26 12:31:38 +0100423 addFilterToDevice(newSource.deviceId(), newSource.port(),
Pier979e61a2018-03-07 11:42:50 +0100424 assignedVlan(newSource), mcastIp, INGRESS);
Pier Luigie80d6b42018-02-26 12:31:38 +0100425 // Setup mcast roles
426 mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100427 INGRESS);
Pier Luigie80d6b42018-02-26 12:31:38 +0100428 } finally {
429 mcastUnlock();
430 }
431 }
432
433 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100434 * Removes the entire mcast tree related to this group.
435 *
436 * @param mcastIp multicast group IP address
437 */
438 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
439 lastMcastChange = Instant.now();
440 mcastLock();
441 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100442 log.debug("Processing route removed for group {}", mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100443
444 // Find out the ingress, transit and egress device of the affected group
Pier979e61a2018-03-07 11:42:50 +0100445 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi6786b922018-02-02 16:19:11 +0100446 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700447 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100448 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi6786b922018-02-02 16:19:11 +0100449
450 // Verify leadership on the operation
451 if (!isLeader(source)) {
452 log.debug("Skip {} due to lack of leadership", mcastIp);
453 return;
454 }
455
Pier1a7e0c02018-03-12 15:00:54 -0700456 // If there are no egress devices, sinks could be only on the ingress
Pier Luigi6786b922018-02-02 16:19:11 +0100457 if (!egressDevices.isEmpty()) {
458 egressDevices.forEach(
459 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
460 );
461 }
Pier1a7e0c02018-03-12 15:00:54 -0700462 // Transit could be empty if sinks are on the ingress
463 if (!transitDevices.isEmpty()) {
464 transitDevices.forEach(
465 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
466 );
Pier Luigi6786b922018-02-02 16:19:11 +0100467 }
468 // Ingress device should be not null
469 if (ingressDevice != null) {
470 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
471 }
Pier Luigi6786b922018-02-02 16:19:11 +0100472 } finally {
473 mcastUnlock();
474 }
475 }
476
477 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100478 * Removes a path from source to sink for given multicast group.
479 *
480 * @param source connect point of the multicast source
481 * @param sink connection point of the multicast sink
482 * @param mcastIp multicast group IP address
483 */
484 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
485 IpAddress mcastIp) {
486 lastMcastChange = Instant.now();
487 mcastLock();
488 try {
Pier Luigi6786b922018-02-02 16:19:11 +0100489 // Verify leadership on the operation
490 if (!isLeader(source)) {
491 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -0700492 return;
493 }
Charles Chanc91c8782016-03-30 17:54:24 -0700494
Pier Luigi92e69be2018-03-02 12:53:37 +0100495 boolean isLast = false;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100496 // When source and sink are on the same device
497 if (source.deviceId().equals(sink.deviceId())) {
498 // Source and sink are on even the same port. There must be something wrong.
499 if (source.port().equals(sink.port())) {
500 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
501 mcastIp, sink, source);
502 return;
503 }
Pier Luigi92e69be2018-03-02 12:53:37 +0100504 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
505 if (isLast) {
506 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
507 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100508 return;
509 }
Charles Chanc91c8782016-03-30 17:54:24 -0700510
Pier Luigi35dab3f2018-01-25 16:16:02 +0100511 // Process the egress device
Pier Luigi92e69be2018-03-02 12:53:37 +0100512 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100513 if (isLast) {
514 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
515 }
516
517 // If this is the last sink on the device, also update upstream
Pier1f87aca2018-03-14 16:47:32 -0700518 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
519 mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100520 if (mcastPath.isPresent()) {
521 List<Link> links = Lists.newArrayList(mcastPath.get().links());
522 Collections.reverse(links);
523 for (Link link : links) {
524 if (isLast) {
525 isLast = removePortFromDevice(
526 link.src().deviceId(),
527 link.src().port(),
528 mcastIp,
529 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
530 );
Pier Luigi92e69be2018-03-02 12:53:37 +0100531 if (isLast) {
532 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
533 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100534 }
Charles Chanc91c8782016-03-30 17:54:24 -0700535 }
536 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100537 } finally {
538 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700539 }
540 }
541
542 /**
543 * Establishes a path from source to sink for given multicast group.
544 *
545 * @param source connect point of the multicast source
546 * @param sink connection point of the multicast sink
547 * @param mcastIp multicast group IP address
548 */
549 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier1f87aca2018-03-14 16:47:32 -0700550 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100551 lastMcastChange = Instant.now();
552 mcastLock();
553 try {
554 // Continue only when this instance is the master of source device
555 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
556 log.debug("Skip {} due to lack of mastership of the source device {}",
557 mcastIp, source.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700558 return;
559 }
Charles Chanc91c8782016-03-30 17:54:24 -0700560
Pier Luigi35dab3f2018-01-25 16:16:02 +0100561 // Process the ingress device
Pier979e61a2018-03-07 11:42:50 +0100562 addFilterToDevice(source.deviceId(), source.port(),
563 assignedVlan(source), mcastIp, INGRESS);
Charles Chan72779502016-04-23 17:36:10 -0700564
Pier Luigi35dab3f2018-01-25 16:16:02 +0100565 // When source and sink are on the same device
566 if (source.deviceId().equals(sink.deviceId())) {
567 // Source and sink are on even the same port. There must be something wrong.
568 if (source.port().equals(sink.port())) {
569 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
570 mcastIp, sink, source);
571 return;
572 }
573 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
Pier979e61a2018-03-07 11:42:50 +0100574 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100575 return;
576 }
Charles Chan72779502016-04-23 17:36:10 -0700577
Pier Luigi35dab3f2018-01-25 16:16:02 +0100578 // Find a path. If present, create/update groups and flows for each hop
Pier1f87aca2018-03-14 16:47:32 -0700579 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
580 mcastIp, allPaths);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100581 if (mcastPath.isPresent()) {
582 List<Link> links = mcastPath.get().links();
Charles Chan72779502016-04-23 17:36:10 -0700583
Pier1a7e0c02018-03-12 15:00:54 -0700584 // Setup mcast role for ingress
585 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
586 INGRESS);
587
588 // Setup properly the transit
Pier Luigi35dab3f2018-01-25 16:16:02 +0100589 links.forEach(link -> {
590 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
591 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier979e61a2018-03-07 11:42:50 +0100592 addFilterToDevice(link.dst().deviceId(), link.dst().port(),
593 assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100594 });
595
Pier1a7e0c02018-03-12 15:00:54 -0700596 // Setup mcast role for the transit
597 links.stream()
598 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
599 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.dst().deviceId()),
600 TRANSIT));
601
Pier Luigi35dab3f2018-01-25 16:16:02 +0100602 // Process the egress device
603 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700604 // Setup mcast role for egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100605 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100606 EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100607 } else {
608 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
609 source.deviceId(), sink.deviceId());
610 }
611 } finally {
612 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700613 }
614 }
615
616 /**
Charles Chan72779502016-04-23 17:36:10 -0700617 * Processes the LINK_DOWN event.
618 *
619 * @param affectedLink Link that is going down
620 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100621 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100622 lastMcastChange = Instant.now();
623 mcastLock();
624 try {
625 // Get groups affected by the link down event
626 getAffectedGroups(affectedLink).forEach(mcastIp -> {
627 // TODO Optimize when the group editing is in place
628 log.debug("Processing link down {} for group {}",
629 affectedLink, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100630
Pier Luigi35dab3f2018-01-25 16:16:02 +0100631 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100632 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100633 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700634 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100635 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100636 ConnectPoint source = getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700637
Pier1a7e0c02018-03-12 15:00:54 -0700638 // Do not proceed if ingress device or source of this group are missing
639 // If sinks are in other leafs, we have ingress, transit, egress, and source
640 // If sinks are in the same leaf, we have just ingress and source
641 if (ingressDevice == null || source == null) {
642 log.warn("Missing ingress {} or source {} for group {}",
643 ingressDevice, source, mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100644 return;
Charles Chan72779502016-04-23 17:36:10 -0700645 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100646
647 // Continue only when this instance is the master of source device
648 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
649 log.debug("Skip {} due to lack of mastership of the source device {}",
Pier1f87aca2018-03-14 16:47:32 -0700650 mcastIp, source.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100651 return;
652 }
653
654 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700655 transitDevices.forEach(transitDevice ->
656 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100657
Pier1a7e0c02018-03-12 15:00:54 -0700658 // Remove transit-facing ports on the ingress device
659 removeIngressTransitPorts(mcastIp, ingressDevice, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100660
Pier1f87aca2018-03-14 16:47:32 -0700661 // Compute mcast tree for the the egress devices
662 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
663
Pier Luigi35dab3f2018-01-25 16:16:02 +0100664 // Construct a new path for each egress device
Pier1f87aca2018-03-14 16:47:32 -0700665 mcastTree.forEach((egressDevice, paths) -> {
666 // We try to enforce the sinks path on the mcast tree
667 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
668 mcastIp, paths);
669 // If a path is present, let's install it
Pier Luigi35dab3f2018-01-25 16:16:02 +0100670 if (mcastPath.isPresent()) {
671 installPath(mcastIp, source, mcastPath.get());
672 } else {
673 log.warn("Fail to recover egress device {} from link failure {}",
674 egressDevice, affectedLink);
675 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
676 }
677 });
Charles Chan72779502016-04-23 17:36:10 -0700678 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100679 } finally {
680 mcastUnlock();
681 }
Charles Chan72779502016-04-23 17:36:10 -0700682 }
683
684 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100685 * Process the DEVICE_DOWN event.
686 *
687 * @param deviceDown device going down
688 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100689 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100690 lastMcastChange = Instant.now();
691 mcastLock();
692 try {
693 // Get the mcast groups affected by the device going down
694 getAffectedGroups(deviceDown).forEach(mcastIp -> {
695 // TODO Optimize when the group editing is in place
696 log.debug("Processing device down {} for group {}",
697 deviceDown, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100698
Pier Luigi35dab3f2018-01-25 16:16:02 +0100699 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100700 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100701 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700702 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100703 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100704 ConnectPoint source = getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100705
Pier Luigi35dab3f2018-01-25 16:16:02 +0100706 // Do not proceed if ingress device or source of this group are missing
707 // If sinks are in other leafs, we have ingress, transit, egress, and source
708 // If sinks are in the same leaf, we have just ingress and source
709 if (ingressDevice == null || source == null) {
710 log.warn("Missing ingress {} or source {} for group {}",
711 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100712 return;
713 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100714
Pier Luigi6786b922018-02-02 16:19:11 +0100715 // Verify leadership on the operation
716 if (!isLeader(source)) {
717 log.debug("Skip {} due to lack of leadership", mcastIp);
718 return;
Pier Luigi580fd8a2018-01-16 10:47:50 +0100719 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100720
721 // If it exists, we have to remove it in any case
Pier1a7e0c02018-03-12 15:00:54 -0700722 if (!transitDevices.isEmpty()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100723 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700724 transitDevices.forEach(transitDevice ->
725 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100726 }
727 // If the ingress is down
728 if (ingressDevice.equals(deviceDown)) {
729 // Remove entire ingress
730 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
731 // If other sinks different from the ingress exist
732 if (!egressDevices.isEmpty()) {
733 // Remove all the remaining egress
734 egressDevices.forEach(
735 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
736 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100737 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100738 } else {
739 // Egress or transit could be down at this point
Pier1a7e0c02018-03-12 15:00:54 -0700740 // Get the ingress-transit ports if they exist
741 removeIngressTransitPorts(mcastIp, ingressDevice, source);
742
Pier Luigi35dab3f2018-01-25 16:16:02 +0100743 // One of the egress device is down
744 if (egressDevices.contains(deviceDown)) {
745 // Remove entire device down
746 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
747 // Remove the device down from egress
748 egressDevices.remove(deviceDown);
749 // If there are no more egress and ingress does not have sinks
750 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100751 // We have done
752 return;
753 }
754 }
Pier1f87aca2018-03-14 16:47:32 -0700755
756 // Compute mcast tree for the the egress devices
757 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
758
Pier Luigi35dab3f2018-01-25 16:16:02 +0100759 // Construct a new path for each egress device
Pier1f87aca2018-03-14 16:47:32 -0700760 mcastTree.forEach((egressDevice, paths) -> {
761 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
762 mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100763 // If there is a new path
764 if (mcastPath.isPresent()) {
765 // Let's install the new mcast path for this egress
766 installPath(mcastIp, source, mcastPath.get());
767 } else {
768 // We were not able to find an alternative path for this egress
769 log.warn("Fail to recover egress device {} from device down {}",
770 egressDevice, deviceDown);
771 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
772 }
773 });
774 }
775 });
776 } finally {
777 mcastUnlock();
778 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100779 }
780
781 /**
Pier1a7e0c02018-03-12 15:00:54 -0700782 * Utility method to remove all the ingress transit ports.
783 *
784 * @param mcastIp the group ip
785 * @param ingressDevice the ingress device for this group
786 * @param source the source connect point
787 */
788 private void removeIngressTransitPorts(IpAddress mcastIp, DeviceId ingressDevice,
789 ConnectPoint source) {
790 Set<PortNumber> ingressTransitPorts = ingressTransitPort(mcastIp);
791 ingressTransitPorts.forEach(ingressTransitPort -> {
792 if (ingressTransitPort != null) {
793 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
794 mcastIp, assignedVlan(source));
795 if (isLast) {
796 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
797 }
798 }
799 });
800 }
801
802 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700803 * Adds filtering objective for given device and port.
804 *
805 * @param deviceId device ID
806 * @param port ingress port number
807 * @param assignedVlan assigned VLAN ID
808 */
Pier979e61a2018-03-07 11:42:50 +0100809 private void addFilterToDevice(DeviceId deviceId, PortNumber port,
810 VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
Charles Chanc91c8782016-03-30 17:54:24 -0700811 // Do nothing if the port is configured as suppressed
Charles Chan370a65b2016-05-10 17:29:47 -0700812 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
813 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi69f774d2018-02-28 12:10:50 +0100814 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Charles Chan370a65b2016-05-10 17:29:47 -0700815 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
816 log.info("Ignore suppressed port {}", connectPoint);
Charles Chanc91c8782016-03-30 17:54:24 -0700817 return;
818 }
819
Charles Chanf909e5b2018-03-02 13:26:22 -0800820 MacAddress routerMac;
821 try {
822 routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
823 } catch (DeviceConfigNotFoundException dcnfe) {
824 log.warn("Fail to push filtering objective since device is not configured. Abort");
825 return;
826 }
827
Charles Chanc91c8782016-03-30 17:54:24 -0700828 FilteringObjective.Builder filtObjBuilder =
Pier979e61a2018-03-07 11:42:50 +0100829 filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
Charles Chan72779502016-04-23 17:36:10 -0700830 ObjectiveContext context = new DefaultObjectiveContext(
831 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800832 deviceId, port.toLong(), assignedVlan),
Charles Chan72779502016-04-23 17:36:10 -0700833 (objective, error) ->
834 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800835 deviceId, port.toLong(), assignedVlan, error));
Charles Chan72779502016-04-23 17:36:10 -0700836 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chanc91c8782016-03-30 17:54:24 -0700837 }
838
839 /**
840 * Adds a port to given multicast group on given device. This involves the
841 * update of L3 multicast group and multicast routing table entry.
842 *
843 * @param deviceId device ID
844 * @param port port to be added
845 * @param mcastIp multicast group
846 * @param assignedVlan assigned VLAN ID
847 */
848 private void addPortToDevice(DeviceId deviceId, PortNumber port,
849 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700850 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700851 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +0100852 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -0700853 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700854 // First time someone request this mcast group via this device
855 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100856 // New nextObj
857 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
858 portBuilder.build(), null).add();
859 // Store the new port
860 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700861 } else {
862 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -0700863 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700864 // Stop if the port is already in the nextobj
865 Set<PortNumber> existingPorts = getPorts(nextObj.next());
866 if (existingPorts.contains(port)) {
867 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
868 return;
869 }
Pier Luigi4f0dd212018-01-19 10:24:53 +0100870 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -0800871 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100872 // Reuse previous nextObj
873 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
874 portBuilder.build(), nextObj.id()).addToExisting();
875 // Store the final next objective and send only the difference to the driver
876 mcastNextObjStore.put(mcastStoreKey, newNextObj);
877 // Add just the new port
878 portBuilder = ImmutableSet.builder();
879 portBuilder.add(port);
880 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
881 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -0700882 }
883 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -0700884 ObjectiveContext context = new DefaultObjectiveContext(
885 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
886 mcastIp, deviceId, port.toLong(), assignedVlan),
887 (objective, error) ->
888 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
889 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700890 ForwardingObjective fwdObj =
Charles Chan72779502016-04-23 17:36:10 -0700891 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -0700892 srManager.flowObjectiveService.next(deviceId, newNextObj);
893 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700894 }
895
896 /**
897 * Removes a port from given multicast group on given device.
898 * This involves the update of L3 multicast group and multicast routing
899 * table entry.
900 *
901 * @param deviceId device ID
902 * @param port port to be added
903 * @param mcastIp multicast group
904 * @param assignedVlan assigned VLAN ID
905 * @return true if this is the last sink on this device
906 */
907 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
908 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700909 McastStoreKey mcastStoreKey =
910 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700911 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -0700912 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700913 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
914 return false;
915 }
Charles Chan72779502016-04-23 17:36:10 -0700916 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700917
918 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -0700919 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -0700920 if (!existingPorts.contains(port)) {
921 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
922 return false;
923 }
924 // Copy and modify the ImmutableSet
925 existingPorts = Sets.newHashSet(existingPorts);
926 existingPorts.remove(port);
927
928 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +0100929 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -0700930 ForwardingObjective fwdObj;
931 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +0100932 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -0700933 // NOTE: Rely on GroupStore garbage collection rather than explicitly
934 // remove L3MG since there might be other flows/groups refer to
935 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +0100936 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700937 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
938 mcastIp, deviceId, port.toLong(), assignedVlan),
939 (objective, error) ->
940 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
941 mcastIp, deviceId, port.toLong(), assignedVlan, error));
942 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
943 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -0700944 } else {
945 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +0100946 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700947 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
948 mcastIp, deviceId, port.toLong(), assignedVlan),
949 (objective, error) ->
950 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
951 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +0100952 // Here we store the next objective with the remaining port
953 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
954 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan82f19972016-05-17 13:13:55 -0700955 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -0700956 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700957 }
Pier Luigi8cd46de2018-01-19 10:24:53 +0100958 // Let's modify the next objective removing the bucket
959 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
960 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
961 srManager.flowObjectiveService.next(deviceId, newNextObj);
962 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700963 return existingPorts.isEmpty();
964 }
965
Charles Chan72779502016-04-23 17:36:10 -0700966 /**
967 * Removes entire group on given device.
968 *
969 * @param deviceId device ID
970 * @param mcastIp multicast group to be removed
971 * @param assignedVlan assigned VLAN ID
972 */
973 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
974 VlanId assignedVlan) {
975 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
976 // This device is not serving this multicast group
977 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
978 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
979 return;
980 }
981 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
982 // NOTE: Rely on GroupStore garbage collection rather than explicitly
983 // remove L3MG since there might be other flows/groups refer to
984 // the same L2IG
985 ObjectiveContext context = new DefaultObjectiveContext(
986 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
987 mcastIp, deviceId, assignedVlan),
988 (objective, error) ->
989 log.warn("Failed to remove {} on {}, vlan {}: {}",
990 mcastIp, deviceId, assignedVlan, error));
991 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
992 srManager.flowObjectiveService.forward(deviceId, fwdObj);
993 mcastNextObjStore.remove(mcastStoreKey);
994 mcastRoleStore.remove(mcastStoreKey);
995 }
996
Pier Luigi580fd8a2018-01-16 10:47:50 +0100997 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
998 // Get Links
999 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001000
1001 // Setup new ingress mcast role
1002 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).src().deviceId()),
1003 INGRESS);
1004
Pier Luigi580fd8a2018-01-16 10:47:50 +01001005 // For each link, modify the next on the source device adding the src port
1006 // and a new filter objective on the destination port
1007 links.forEach(link -> {
1008 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
1009 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier979e61a2018-03-07 11:42:50 +01001010 addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1011 assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001012 });
Pier1a7e0c02018-03-12 15:00:54 -07001013
1014 // Setup mcast role for the transit
1015 links.stream()
1016 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
1017 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.src().deviceId()),
1018 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001019 }
1020
Charles Chanc91c8782016-03-30 17:54:24 -07001021 /**
1022 * Creates a next objective builder for multicast.
1023 *
1024 * @param mcastIp multicast group
1025 * @param assignedVlan assigned VLAN ID
1026 * @param outPorts set of output port numbers
1027 * @return next objective builder
1028 */
1029 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001030 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
1031 // If nextId is null allocate a new one
1032 if (nextId == null) {
1033 nextId = srManager.flowObjectiveService.allocateNextId();
1034 }
Charles Chanc91c8782016-03-30 17:54:24 -07001035
1036 TrafficSelector metadata =
1037 DefaultTrafficSelector.builder()
1038 .matchVlanId(assignedVlan)
1039 .matchIPDst(mcastIp.toIpPrefix())
1040 .build();
1041
1042 NextObjective.Builder nextObjBuilder = DefaultNextObjective
1043 .builder().withId(nextId)
Pier Luigi69f774d2018-02-28 12:10:50 +01001044 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId())
Charles Chanc91c8782016-03-30 17:54:24 -07001045 .withMeta(metadata);
1046
1047 outPorts.forEach(port -> {
1048 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1049 if (egressVlan().equals(VlanId.NONE)) {
1050 tBuilder.popVlan();
1051 }
1052 tBuilder.setOutput(port);
1053 nextObjBuilder.addTreatment(tBuilder.build());
1054 });
1055
1056 return nextObjBuilder;
1057 }
1058
1059 /**
1060 * Creates a forwarding objective builder for multicast.
1061 *
1062 * @param mcastIp multicast group
1063 * @param assignedVlan assigned VLAN ID
1064 * @param nextId next ID of the L3 multicast group
1065 * @return forwarding objective builder
1066 */
1067 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
1068 VlanId assignedVlan, int nextId) {
1069 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Fergusonf1d9c342017-08-10 18:15:24 +00001070 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
1071
1072 if (mcastIp.isIp4()) {
1073 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
1074 sbuilder.matchIPDst(mcastPrefix);
1075 } else {
1076 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
1077 sbuilder.matchIPv6Dst(mcastPrefix);
1078 }
1079
1080
Charles Chanc91c8782016-03-30 17:54:24 -07001081 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
1082 metabuilder.matchVlanId(assignedVlan);
1083
1084 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
1085 fwdBuilder.withSelector(sbuilder.build())
1086 .withMeta(metabuilder.build())
1087 .nextStep(nextId)
1088 .withFlag(ForwardingObjective.Flag.SPECIFIC)
Pier Luigi69f774d2018-02-28 12:10:50 +01001089 .fromApp(srManager.appId())
Charles Chanc91c8782016-03-30 17:54:24 -07001090 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
1091 return fwdBuilder;
1092 }
1093
1094 /**
1095 * Creates a filtering objective builder for multicast.
1096 *
Charles Chanc91c8782016-03-30 17:54:24 -07001097 * @param ingressPort ingress port of the multicast stream
1098 * @param assignedVlan assigned VLAN ID
Charles Chanf909e5b2018-03-02 13:26:22 -08001099 * @param routerMac router MAC. This is carried in metadata and used from some switches that
1100 * need to put unicast entry before multicast entry in TMAC table.
Charles Chanc91c8782016-03-30 17:54:24 -07001101 * @return filtering objective builder
1102 */
Charles Chan958ce892018-03-02 15:41:41 -08001103 private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort,
Pier979e61a2018-03-07 11:42:50 +01001104 VlanId assignedVlan, IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
Charles Chanc91c8782016-03-30 17:54:24 -07001105 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Pier979e61a2018-03-07 11:42:50 +01001106 // Let's add the in port matching and the priority
1107 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
1108 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
1109 // According to the mcast role we match on the proper vlan
1110 // If the role is null we are on the transit or on the egress
1111 if (mcastRole == null) {
1112 filtBuilder.addCondition(Criteria.matchVlanId(egressVlan()));
Julia Fergusonf1d9c342017-08-10 18:15:24 +00001113 } else {
Pier979e61a2018-03-07 11:42:50 +01001114 filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
Julia Fergusonf1d9c342017-08-10 18:15:24 +00001115 }
Pier979e61a2018-03-07 11:42:50 +01001116 // According to the IP type we set the proper match on the mac address
1117 if (mcastIp.isIp4()) {
1118 filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
1119 MacAddress.IPV4_MULTICAST_MASK));
1120 } else {
1121 filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
1122 MacAddress.IPV6_MULTICAST_MASK));
1123 }
1124 // We finally build the meta treatment
Charles Chan0932eca2016-06-28 16:50:13 -07001125 TrafficTreatment tt = DefaultTrafficTreatment.builder()
Charles Chanf909e5b2018-03-02 13:26:22 -08001126 .pushVlan().setVlanId(assignedVlan)
1127 .setEthDst(routerMac)
1128 .build();
Charles Chan0932eca2016-06-28 16:50:13 -07001129 filtBuilder.withMeta(tt);
Pier979e61a2018-03-07 11:42:50 +01001130 // Done, we return a permit filtering objective
Pier Luigi69f774d2018-02-28 12:10:50 +01001131 return filtBuilder.permit().fromApp(srManager.appId());
Charles Chanc91c8782016-03-30 17:54:24 -07001132 }
1133
1134 /**
1135 * Gets output ports information from treatments.
1136 *
1137 * @param treatments collection of traffic treatments
1138 * @return set of output port numbers
1139 */
1140 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
1141 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
1142 treatments.forEach(treatment -> {
1143 treatment.allInstructions().stream()
1144 .filter(instr -> instr instanceof OutputInstruction)
1145 .forEach(instr -> {
1146 builder.add(((OutputInstruction) instr).port());
1147 });
1148 });
1149 return builder.build();
1150 }
1151
Pier1f87aca2018-03-14 16:47:32 -07001152 /**
1153 * Go through all the paths, looking for shared links to be used
1154 * in the final path computation.
1155 *
1156 * @param egresses egress devices
1157 * @param availablePaths all the available paths towards the egress
1158 * @return shared links between egress devices
1159 */
1160 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1161 Map<DeviceId, List<Path>> availablePaths) {
1162 // Length of the shortest path
1163 int minLength = Integer.MAX_VALUE;
1164 int length;
1165 // Current paths
1166 List<Path> currentPaths;
1167 // Verify the source can still reach all the egresses
1168 for (DeviceId egress : egresses) {
1169 // From the source we cannot reach all the sinks
1170 // just continue and let's figured out after
1171 currentPaths = availablePaths.get(egress);
1172 if (currentPaths.isEmpty()) {
1173 continue;
1174 }
1175 // Get the length of the first one available,
1176 // update the min length and save the paths
1177 length = currentPaths.get(0).links().size();
1178 if (length < minLength) {
1179 minLength = length;
1180 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001181 }
Pier1f87aca2018-03-14 16:47:32 -07001182 // If there are no paths
1183 if (minLength == Integer.MAX_VALUE) {
1184 return Collections.emptySet();
1185 }
1186 // Iterate looking for shared links
1187 int index = 0;
1188 // Define the sets for the intersection
1189 Set<Link> sharedLinks = Sets.newHashSet();
1190 Set<Link> currentSharedLinks;
1191 Set<Link> currentLinks;
1192 DeviceId deviceToRemove = null;
1193 // Let's find out the shared links
1194 while (index < minLength) {
1195 // Initialize the intersection with the paths related to the first egress
1196 currentPaths = availablePaths.get(
1197 egresses.stream()
1198 .findFirst()
1199 .orElse(null)
1200 );
1201 currentSharedLinks = Sets.newHashSet();
1202 // Iterate over the paths and take the "index" links
1203 for (Path path : currentPaths) {
1204 currentSharedLinks.add(path.links().get(index));
1205 }
1206 // Iterate over the remaining egress
1207 for (DeviceId egress : egresses) {
1208 // Iterate over the paths and take the "index" links
1209 currentLinks = Sets.newHashSet();
1210 for (Path path : availablePaths.get(egress)) {
1211 currentLinks.add(path.links().get(index));
1212 }
1213 // Do intersection
1214 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1215 // If there are no shared paths exit and record the device to remove
1216 // we have to retry with a subset of sinks
1217 if (currentSharedLinks.isEmpty()) {
1218 deviceToRemove = egress;
1219 index = minLength;
1220 break;
1221 }
1222 }
1223 sharedLinks.addAll(currentSharedLinks);
1224 index++;
1225 }
1226 // If the shared links is empty and there are egress
1227 // let's retry another time with less sinks, we can
1228 // still build optimal subtrees
1229 if (sharedLinks.isEmpty() && egresses.size() > 1 && deviceToRemove != null) {
1230 egresses.remove(deviceToRemove);
1231 sharedLinks = exploreMcastTree(egresses, availablePaths);
1232 }
1233 return sharedLinks;
1234 }
1235
1236 /**
1237 * Build Mcast tree having as root the given source and as leaves the given egress points.
1238 *
1239 * @param source source of the tree
1240 * @param sinks leaves of the tree
1241 * @return the computed Mcast tree
1242 */
1243 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
1244 Set<ConnectPoint> sinks) {
1245 // Get the egress devices, remove source from the egress if present
1246 Set<DeviceId> egresses = sinks.stream()
1247 .map(ConnectPoint::deviceId)
1248 .filter(deviceId -> !deviceId.equals(source))
1249 .collect(Collectors.toSet());
1250 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
1251 // Build final tree nad return it as it is
1252 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
1253 mcastTree.forEach((egress, paths) ->
1254 sinks.stream().filter(sink -> sink.deviceId().equals(egress))
1255 .forEach(sink -> finalTree.put(sink, mcastTree.get(sink.deviceId()))));
1256 return finalTree;
1257 }
1258
1259 /**
1260 * Build Mcast tree having as root the given source and as leaves the given egress.
1261 *
1262 * @param source source of the tree
1263 * @param egresses leaves of the tree
1264 * @return the computed Mcast tree
1265 */
1266 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1267 Set<DeviceId> egresses) {
1268 // Pre-compute all the paths
1269 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
1270 // No links to enforce
1271 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1272 Collections.emptySet())));
1273 // Explore the topology looking for shared links amongst the egresses
1274 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
1275 // Remove all the paths from the previous computation
1276 availablePaths.clear();
1277 // Build the final paths enforcing the shared links between egress devices
1278 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1279 linksToEnforce)));
1280 return availablePaths;
1281 }
1282
1283 /**
1284 * Gets path from src to dst computed using the custom link weigher.
1285 *
1286 * @param src source device ID
1287 * @param dst destination device ID
1288 * @return list of paths from src to dst
1289 */
1290 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
1291 // Takes a snapshot of the topology
1292 final Topology currentTopology = topologyService.currentTopology();
1293 // Build a specific link weigher for this path computation
1294 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
1295 // We will use our custom link weigher for our path
1296 // computations and build the list of valid paths
1297 List<Path> allPaths = Lists.newArrayList(
1298 topologyService.getPaths(currentTopology, src, dst, linkWeigher)
1299 );
1300 // If there are no valid paths, just exit
1301 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1302 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001303 }
1304
Charles Chanc91c8782016-03-30 17:54:24 -07001305 /**
1306 * Gets a path from src to dst.
1307 * If a path was allocated before, returns the allocated path.
1308 * Otherwise, randomly pick one from available paths.
1309 *
1310 * @param src source device ID
1311 * @param dst destination device ID
1312 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001313 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001314 * @return an optional path from src to dst
1315 */
Pier1f87aca2018-03-14 16:47:32 -07001316 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1317 IpAddress mcastIp, List<Path> allPaths) {
1318 // Firstly we get all the valid paths, if the supplied are null
1319 if (allPaths == null) {
1320 allPaths = getPaths(src, dst, Collections.emptySet());
1321 }
1322
1323 // If there are no paths just exit
Charles Chanc91c8782016-03-30 17:54:24 -07001324 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001325 return Optional.empty();
1326 }
1327
Pier Luigi91573e12018-01-23 16:06:38 +01001328 // Create a map index of suitablity-to-list of paths. For example
1329 // a path in the list associated to the index 1 shares only the
1330 // first hop and it is less suitable of a path belonging to the index
1331 // 2 that shares leaf-spine.
1332 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1333 // Some init steps
1334 int nhop;
1335 McastStoreKey mcastStoreKey;
1336 Link hop;
1337 PortNumber srcPort;
1338 Set<PortNumber> existingPorts;
1339 NextObjective nextObj;
1340 // Iterate over paths looking for eligible paths
1341 for (Path path : allPaths) {
1342 // Unlikely, it will happen...
1343 if (!src.equals(path.links().get(0).src().deviceId())) {
1344 continue;
1345 }
1346 nhop = 0;
1347 // Iterate over the links
1348 while (nhop < path.links().size()) {
1349 // Get the link and verify if a next related
1350 // to the src device exist in the store
1351 hop = path.links().get(nhop);
1352 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1353 // It does not exist in the store, exit
1354 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1355 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001356 }
Pier Luigi91573e12018-01-23 16:06:38 +01001357 // Get the output ports on the next
1358 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1359 existingPorts = getPorts(nextObj.next());
1360 // And the src port on the link
1361 srcPort = hop.src().port();
1362 // the src port is not used as output, exit
1363 if (!existingPorts.contains(srcPort)) {
1364 break;
1365 }
1366 nhop++;
1367 }
1368 // n_hop defines the index
1369 if (nhop > 0) {
1370 eligiblePaths.compute(nhop, (index, paths) -> {
1371 paths = paths == null ? Lists.newArrayList() : paths;
1372 paths.add(path);
1373 return paths;
1374 });
Charles Chanc91c8782016-03-30 17:54:24 -07001375 }
1376 }
Pier Luigi91573e12018-01-23 16:06:38 +01001377
1378 // No suitable paths
1379 if (eligiblePaths.isEmpty()) {
1380 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1381 // Otherwise, randomly pick a path
1382 Collections.shuffle(allPaths);
1383 return allPaths.stream().findFirst();
1384 }
1385
1386 // Let's take the best ones
1387 Integer bestIndex = eligiblePaths.keySet()
1388 .stream()
1389 .sorted(Comparator.reverseOrder())
1390 .findFirst().orElse(null);
1391 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1392 log.debug("{} eligiblePath(s) found from {} to {}",
1393 bestPaths.size(), src, dst);
1394 // randomly pick a path on the highest index
1395 Collections.shuffle(bestPaths);
1396 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001397 }
1398
1399 /**
Charles Chan72779502016-04-23 17:36:10 -07001400 * Gets device(s) of given role in given multicast group.
1401 *
1402 * @param mcastIp multicast IP
1403 * @param role multicast role
1404 * @return set of device ID or empty set if not found
1405 */
1406 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1407 return mcastRoleStore.entrySet().stream()
1408 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1409 entry.getValue().value() == role)
Pier1f87aca2018-03-14 16:47:32 -07001410 .map(Entry::getKey).map(McastStoreKey::deviceId)
Charles Chan72779502016-04-23 17:36:10 -07001411 .collect(Collectors.toSet());
1412 }
1413
1414 /**
Charles Chana8f9dee2016-05-16 18:44:13 -07001415 * Gets source connect point of given multicast group.
1416 *
1417 * @param mcastIp multicast IP
1418 * @return source connect point or null if not found
1419 */
Pier1f87aca2018-03-14 16:47:32 -07001420 // FIXME To be addressed with multiple sources support
Charles Chana8f9dee2016-05-16 18:44:13 -07001421 private ConnectPoint getSource(IpAddress mcastIp) {
Pier1f87aca2018-03-14 16:47:32 -07001422 // FIXME we should support different types of routes
1423 McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
1424 .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
1425 .findFirst().orElse(null);
1426 return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
1427 .stream()
1428 .findFirst().orElse(null);
Charles Chana8f9dee2016-05-16 18:44:13 -07001429 }
Pier Luigi92e69be2018-03-02 12:53:37 +01001430 /**
1431 * Gets sinks of given multicast group.
1432 *
1433 * @param mcastIp multicast IP
1434 * @return set of sinks or empty set if not found
1435 */
1436 private Set<ConnectPoint> getSinks(IpAddress mcastIp) {
Pier1f87aca2018-03-14 16:47:32 -07001437 // FIXME we should support different types of routes
1438 McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
1439 .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
1440 .findFirst().orElse(null);
1441 return mcastRoute == null ?
1442 Collections.emptySet() : srManager.multicastRouteService.sinks(mcastRoute);
Pier Luigi92e69be2018-03-02 12:53:37 +01001443 }
Charles Chana8f9dee2016-05-16 18:44:13 -07001444
1445 /**
Charles Chan72779502016-04-23 17:36:10 -07001446 * Gets groups which is affected by the link down event.
1447 *
1448 * @param link link going down
1449 * @return a set of multicast IpAddress
1450 */
1451 private Set<IpAddress> getAffectedGroups(Link link) {
1452 DeviceId deviceId = link.src().deviceId();
1453 PortNumber port = link.src().port();
1454 return mcastNextObjStore.entrySet().stream()
1455 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1456 getPorts(entry.getValue().value().next()).contains(port))
Pier1f87aca2018-03-14 16:47:32 -07001457 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Charles Chan72779502016-04-23 17:36:10 -07001458 .collect(Collectors.toSet());
1459 }
1460
1461 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001462 * Gets groups which are affected by the device down event.
1463 *
1464 * @param deviceId device going down
1465 * @return a set of multicast IpAddress
1466 */
1467 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1468 return mcastNextObjStore.entrySet().stream()
1469 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001470 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001471 .collect(Collectors.toSet());
1472 }
1473
1474 /**
Pier979e61a2018-03-07 11:42:50 +01001475 * Gets ingress VLAN from McastConfig.
1476 *
1477 * @return ingress VLAN or VlanId.NONE if not configured
1478 */
1479 private VlanId ingressVlan() {
1480 McastConfig mcastConfig =
1481 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1482 return (mcastConfig != null) ? mcastConfig.ingressVlan() : VlanId.NONE;
1483 }
1484
1485 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001486 * Gets egress VLAN from McastConfig.
1487 *
1488 * @return egress VLAN or VlanId.NONE if not configured
1489 */
1490 private VlanId egressVlan() {
1491 McastConfig mcastConfig =
1492 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1493 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1494 }
1495
1496 /**
1497 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chana8f9dee2016-05-16 18:44:13 -07001498 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chanc91c8782016-03-30 17:54:24 -07001499 *
Charles Chana8f9dee2016-05-16 18:44:13 -07001500 * @param cp connect point; Can be null if not specified
1501 * @return assigned VLAN ID
Charles Chanc91c8782016-03-30 17:54:24 -07001502 */
Charles Chana8f9dee2016-05-16 18:44:13 -07001503 private VlanId assignedVlan(ConnectPoint cp) {
1504 // Use the egressVlan if it is tagged
1505 if (!egressVlan().equals(VlanId.NONE)) {
1506 return egressVlan();
1507 }
1508 // Reuse unicast VLAN if the port has subnet configured
1509 if (cp != null) {
Charles Chanf9759582017-10-20 19:09:16 -07001510 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan10b0fb72017-02-02 16:20:42 -08001511 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chana8f9dee2016-05-16 18:44:13 -07001512 }
Charles Chan10b0fb72017-02-02 16:20:42 -08001513 // Use DEFAULT_VLAN if none of the above matches
1514 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -07001515 }
Charles Chan72779502016-04-23 17:36:10 -07001516
1517 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +01001518 * Gets assigned VLAN according to the value in the meta.
1519 *
1520 * @param nextObjective nextObjective to analyze
1521 * @return assigned VLAN ID
1522 */
1523 private VlanId assignedVlanFromNext(NextObjective nextObjective) {
1524 return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
1525 }
1526
1527 /**
Charles Chan72779502016-04-23 17:36:10 -07001528 * Gets the spine-facing port on ingress device of given multicast group.
1529 *
1530 * @param mcastIp multicast IP
1531 * @return spine-facing port on ingress device
1532 */
Pier1a7e0c02018-03-12 15:00:54 -07001533 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp) {
Pier979e61a2018-03-07 11:42:50 +01001534 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Charles Chan72779502016-04-23 17:36:10 -07001535 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001536 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001537 if (ingressDevice != null) {
1538 NextObjective nextObj = mcastNextObjStore
1539 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1540 Set<PortNumber> ports = getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001541 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001542 for (PortNumber port : ports) {
1543 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001544 if (srManager.deviceConfiguration() != null &&
1545 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001546 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001547 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001548 }
1549 }
1550 }
Pier1a7e0c02018-03-12 15:00:54 -07001551 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001552 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001553
1554 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001555 * Verify if the given device has sinks
1556 * for the multicast group.
1557 *
1558 * @param deviceId device Id
1559 * @param mcastIp multicast IP
1560 * @return true if the device has sink for the group.
1561 * False otherwise.
1562 */
1563 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1564 if (deviceId != null) {
1565 // Get the nextobjective
1566 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1567 new McastStoreKey(mcastIp, deviceId)
1568 );
1569 // If it exists
1570 if (versionedNextObj != null) {
1571 NextObjective nextObj = versionedNextObj.value();
1572 // Retrieves all the output ports
1573 Set<PortNumber> ports = getPorts(nextObj.next());
1574 // Tries to find at least one port that is not spine-facing
1575 for (PortNumber port : ports) {
1576 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001577 if (srManager.deviceConfiguration() != null &&
1578 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi580fd8a2018-01-16 10:47:50 +01001579 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1580 return true;
1581 }
1582 }
1583 }
1584 }
1585 return false;
1586 }
1587
1588 /**
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001589 * Removes filtering objective for given device and port.
1590 *
1591 * @param deviceId device ID
1592 * @param port ingress port number
1593 * @param assignedVlan assigned VLAN ID
1594 * @param mcastIp multicast IP address
1595 */
Pier979e61a2018-03-07 11:42:50 +01001596 private void removeFilterToDevice(DeviceId deviceId, PortNumber port,
1597 VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001598 // Do nothing if the port is configured as suppressed
1599 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1600 SegmentRoutingAppConfig appConfig = srManager.cfgService
Pier Luigi69f774d2018-02-28 12:10:50 +01001601 .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001602 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1603 log.info("Ignore suppressed port {}", connectPoint);
1604 return;
1605 }
1606
Charles Chanf909e5b2018-03-02 13:26:22 -08001607 MacAddress routerMac;
1608 try {
1609 routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
1610 } catch (DeviceConfigNotFoundException dcnfe) {
1611 log.warn("Fail to push filtering objective since device is not configured. Abort");
1612 return;
1613 }
1614
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001615 FilteringObjective.Builder filtObjBuilder =
Pier979e61a2018-03-07 11:42:50 +01001616 filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001617 ObjectiveContext context = new DefaultObjectiveContext(
1618 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1619 deviceId, port.toLong(), assignedVlan),
1620 (objective, error) ->
1621 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1622 deviceId, port.toLong(), assignedVlan, error));
1623 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1624 }
1625
1626 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001627 * Updates filtering objective for given device and port.
1628 * It is called in general when the mcast config has been
1629 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001630 *
1631 * @param deviceId device ID
1632 * @param portNum ingress port number
1633 * @param vlanId assigned VLAN ID
1634 * @param install true to add, false to remove
1635 */
Pier Luigi69f774d2018-02-28 12:10:50 +01001636 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001637 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001638 lastMcastChange = Instant.now();
1639 mcastLock();
1640 try {
1641 // Iterates over the route and updates properly the filtering objective
1642 // on the source device.
1643 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier1f87aca2018-03-14 16:47:32 -07001644 // FIXME To be addressed with multiple sources support
1645 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
1646 .stream()
1647 .findFirst().orElse(null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001648 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1649 if (install) {
Pier979e61a2018-03-07 11:42:50 +01001650 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001651 } else {
Pier979e61a2018-03-07 11:42:50 +01001652 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001653 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001654 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001655 });
1656 } finally {
1657 mcastUnlock();
1658 }
1659 }
1660
Pier Luigi6786b922018-02-02 16:19:11 +01001661 private boolean isLeader(ConnectPoint source) {
1662 // Continue only when we have the mastership on the operation
1663 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1664 // When the source is available we just check the mastership
1665 if (srManager.deviceService.isAvailable(source.deviceId())) {
1666 return false;
1667 }
1668 // Fallback with Leadership service
1669 // source id is used a topic
1670 NodeId leader = srManager.leadershipService.runForLeadership(
1671 source.deviceId().toString()).leaderNodeId();
1672 // Verify if this node is the leader
1673 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
1674 return false;
1675 }
1676 }
1677 // Done
1678 return true;
1679 }
1680
Pier Luigi35dab3f2018-01-25 16:16:02 +01001681 /**
1682 * Performs bucket verification operation for all mcast groups in the devices.
1683 * Firstly, it verifies that mcast is stable before trying verification operation.
1684 * Verification consists in creating new nexts with VERIFY operation. Actually,
1685 * the operation is totally delegated to the driver.
1686 */
1687 private final class McastBucketCorrector implements Runnable {
1688
1689 @Override
1690 public void run() {
1691 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1692 if (!isMcastStable()) {
1693 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001694 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001695 // Acquires lock
1696 mcastLock();
1697 try {
1698 // Iterates over the routes and verify the related next objectives
1699 srManager.multicastRouteService.getRoutes()
1700 .stream()
1701 .map(McastRoute::group)
1702 .forEach(mcastIp -> {
1703 log.trace("Running mcast buckets corrector for mcast group: {}",
1704 mcastIp);
1705
1706 // For each group we get current information in the store
1707 // and issue a check of the next objectives in place
Pier979e61a2018-03-07 11:42:50 +01001708 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001709 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001710 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +01001711 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi92e69be2018-03-02 12:53:37 +01001712 // Get source and sinks from Mcast Route Service and warn about errors
Pier Luigi35dab3f2018-01-25 16:16:02 +01001713 ConnectPoint source = getSource(mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001714 Set<ConnectPoint> sinks = getSinks(mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001715
1716 // Do not proceed if ingress device or source of this group are missing
1717 if (ingressDevice == null || source == null) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001718 if (!sinks.isEmpty()) {
1719 log.warn("Unable to run buckets corrector. " +
1720 "Missing ingress {} or source {} for group {}",
1721 ingressDevice, source, mcastIp);
1722 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001723 return;
1724 }
1725
1726 // Continue only when this instance is the master of source device
1727 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1728 log.trace("Unable to run buckets corrector. " +
1729 "Skip {} due to lack of mastership " +
1730 "of the source device {}",
1731 mcastIp, source.deviceId());
1732 return;
1733 }
1734
1735 // Create the set of the devices to be processed
1736 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1737 devicesBuilder.add(ingressDevice);
Pier1a7e0c02018-03-12 15:00:54 -07001738 if (!transitDevices.isEmpty()) {
1739 devicesBuilder.addAll(transitDevices);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001740 }
1741 if (!egressDevices.isEmpty()) {
1742 devicesBuilder.addAll(egressDevices);
1743 }
1744 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1745
1746 // Iterate over the devices
1747 devicesToProcess.forEach(deviceId -> {
1748 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1749 // If next exists in our store verify related next objective
1750 if (mcastNextObjStore.containsKey(currentKey)) {
1751 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1752 // Get current ports
1753 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1754 // Rebuild the next objective
1755 currentNext = nextObjBuilder(
1756 mcastIp,
1757 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1758 currentPorts,
1759 currentNext.id()
1760 ).verify();
1761 // Send to the flowobjective service
1762 srManager.flowObjectiveService.next(deviceId, currentNext);
1763 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001764 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001765 "Missing next for {} and group {}",
1766 deviceId, mcastIp);
1767 }
1768 });
1769
1770 });
1771 } finally {
1772 // Finally, it releases the lock
1773 mcastUnlock();
1774 }
1775
1776 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001777 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001778
1779 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1780 // If mcast ip is present
1781 if (mcastIp != null) {
1782 return mcastNextObjStore.entrySet().stream()
1783 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001784 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001785 entry -> entry.getValue().value().id()));
1786 }
1787 // Otherwise take all the groups
1788 return mcastNextObjStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001789 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001790 entry -> entry.getValue().value().id()));
1791 }
1792
Pier Luigi69f774d2018-02-28 12:10:50 +01001793 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001794 // If mcast ip is present
1795 if (mcastIp != null) {
1796 return mcastRoleStore.entrySet().stream()
1797 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001798 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001799 entry -> entry.getValue().value()));
1800 }
1801 // Otherwise take all the groups
1802 return mcastRoleStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001803 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001804 entry -> entry.getValue().value()));
1805 }
1806
1807 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1808 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1809 // Get the source
1810 ConnectPoint source = getSource(mcastIp);
1811 // Source cannot be null, we don't know the starting point
1812 if (source != null) {
1813 // Init steps
1814 Set<DeviceId> visited = Sets.newHashSet();
1815 List<ConnectPoint> currentPath = Lists.newArrayList(
1816 source
1817 );
1818 // Build recursively the mcast paths
1819 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1820 }
1821 return mcastPaths;
1822 }
1823
1824 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1825 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1826 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1827 // If we have visited the node to visit
1828 // there is a loop
1829 if (visited.contains(toVisit)) {
1830 return;
1831 }
1832 // Visit next-hop
1833 visited.add(toVisit);
1834 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1835 // Looking for next-hops
1836 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1837 // Build egress connectpoints
1838 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1839 // Get Ports
1840 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
1841 // Build relative cps
1842 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1843 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1844 Set<ConnectPoint> egressPoints = cpBuilder.build();
1845 // Define other variables for the next steps
1846 Set<Link> egressLinks;
1847 List<ConnectPoint> newCurrentPath;
1848 Set<DeviceId> newVisited;
1849 DeviceId newToVisit;
1850 for (ConnectPoint egressPoint : egressPoints) {
1851 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1852 // If it does not have egress links, stop
1853 if (egressLinks.isEmpty()) {
1854 // Add the connect points to the path
1855 newCurrentPath = Lists.newArrayList(currentPath);
1856 newCurrentPath.add(0, egressPoint);
1857 // Save in the map
1858 mcastPaths.put(egressPoint, newCurrentPath);
1859 } else {
1860 newVisited = Sets.newHashSet(visited);
1861 // Iterate over the egress links for the next hops
1862 for (Link egressLink : egressLinks) {
1863 // Update to visit
1864 newToVisit = egressLink.dst().deviceId();
1865 // Add the connect points to the path
1866 newCurrentPath = Lists.newArrayList(currentPath);
1867 newCurrentPath.add(0, egressPoint);
1868 newCurrentPath.add(0, egressLink.dst());
1869 // Go to the next hop
1870 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1871 }
1872 }
1873 }
1874 }
1875 }
1876
Charles Chanc91c8782016-03-30 17:54:24 -07001877}