blob: 584e51d65ba5a974638142ba08ecdf345e3cbcbf [file] [log] [blame]
Charles Chand55e84d2016-03-30 17:54:24 -07001/*
Pier Luigi96fe0772018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chand55e84d2016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi96fe0772018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chand55e84d2016-03-30 17:54:24 -070018
Pier3e793752018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigi05514fd2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pierb1fe7382018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pierb0328e42018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chand55e84d2016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigibad6d6c2018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pierb1fe7382018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chand55e84d2016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chand55e84d2016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chand55e84d2016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pier96f63cb2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chand55e84d2016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier3ee24552018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pierb0328e42018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier3ee24552018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Pierb0328e42018-03-27 11:29:42 -070041import org.onosproject.net.HostId;
Charles Chand55e84d2016-03-30 17:54:24 -070042import org.onosproject.net.ConnectPoint;
43import org.onosproject.net.DeviceId;
44import org.onosproject.net.Link;
45import org.onosproject.net.Path;
46import org.onosproject.net.PortNumber;
Charles Chan2199c302016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Pier3ee24552018-03-14 16:47:32 -070051import org.onosproject.net.topology.LinkWeigher;
Pier Luigi83f919b2018-02-15 16:33:08 +010052import org.onosproject.net.topology.Topology;
Charles Chand55e84d2016-03-30 17:54:24 -070053import org.onosproject.net.topology.TopologyService;
Pier3ee24552018-03-14 16:47:32 -070054import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi96fe0772018-02-28 12:10:50 +010055import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chand55e84d2016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Charles Chand55e84d2016-03-30 17:54:24 -070059import org.slf4j.Logger;
60import org.slf4j.LoggerFactory;
61
Pier Luigib72201b2018-01-25 16:16:02 +010062import java.time.Instant;
Charles Chand55e84d2016-03-30 17:54:24 -070063import java.util.Collection;
64import java.util.Collections;
Pier Luigibad6d6c2018-01-23 16:06:38 +010065import java.util.Comparator;
Charles Chand55e84d2016-03-30 17:54:24 -070066import java.util.List;
Charles Chan2199c302016-04-23 17:36:10 -070067import java.util.Map;
Pier3ee24552018-03-14 16:47:32 -070068import java.util.Map.Entry;
Charles Chand55e84d2016-03-30 17:54:24 -070069import java.util.Optional;
70import java.util.Set;
Pier Luigib72201b2018-01-25 16:16:02 +010071import java.util.concurrent.ScheduledExecutorService;
72import java.util.concurrent.TimeUnit;
73import java.util.concurrent.locks.Lock;
74import java.util.concurrent.locks.ReentrantLock;
Charles Chan2199c302016-04-23 17:36:10 -070075import java.util.stream.Collectors;
76
Pier Luigib72201b2018-01-25 16:16:02 +010077import static java.util.concurrent.Executors.newScheduledThreadPool;
78import static org.onlab.util.Tools.groupedThreads;
Pier3ee24552018-03-14 16:47:32 -070079
Pier96f63cb2018-04-17 16:29:56 +020080import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pierb0328e42018-03-27 11:29:42 -070081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pier96f63cb2018-04-17 16:29:56 +020082import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
83import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
84import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
85import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
86
Piere5bff482018-03-07 11:42:50 +010087import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
88import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
89import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chand55e84d2016-03-30 17:54:24 -070090
91/**
Pier Luigi96fe0772018-02-28 12:10:50 +010092 * Handles Multicast related events.
Charles Chand55e84d2016-03-30 17:54:24 -070093 */
Charles Chand2990362016-04-18 13:44:03 -070094public class McastHandler {
95 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chand55e84d2016-03-30 17:54:24 -070096 private final SegmentRoutingManager srManager;
Charles Chanfc5c7802016-05-17 13:13:55 -070097 private final TopologyService topologyService;
Pier96f63cb2018-04-17 16:29:56 +020098 private final McastUtils mcastUtils;
Charles Chan2199c302016-04-23 17:36:10 -070099 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Pier3e793752018-04-19 16:47:06 +0200100 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Charles Chan2199c302016-04-23 17:36:10 -0700101
Pier Luigi05514fd2018-02-28 17:24:03 +0100102 // Wait time for the cache
103 private static final int WAIT_TIME_MS = 1000;
Pierb0328e42018-03-27 11:29:42 -0700104
Pier3e793752018-04-19 16:47:06 +0200105 //The mcastEventCache is implemented to avoid race condition by giving more time
106 // to the underlying subsystems to process previous calls.
Pier Luigi05514fd2018-02-28 17:24:03 +0100107 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
108 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
109 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigi05514fd2018-02-28 17:24:03 +0100110 IpAddress mcastIp = notification.getKey().mcastIp();
Pierb0328e42018-03-27 11:29:42 -0700111 HostId sink = notification.getKey().sinkHost();
Pier Luigi05514fd2018-02-28 17:24:03 +0100112 McastEvent mcastEvent = notification.getValue();
113 RemovalCause cause = notification.getCause();
114 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
115 mcastIp, sink, mcastEvent, cause);
Pier3e793752018-04-19 16:47:06 +0200116 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigi05514fd2018-02-28 17:24:03 +0100117 switch (notification.getCause()) {
118 case REPLACED:
119 case EXPIRED:
120 dequeueMcastEvent(mcastEvent);
121 break;
122 default:
123 break;
124 }
125 }).build();
126
127 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier3ee24552018-03-14 16:47:32 -0700128 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pierb0328e42018-03-27 11:29:42 -0700129 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
130 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pierb0328e42018-03-27 11:29:42 -0700131 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier3ee24552018-03-14 16:47:32 -0700132 if (mcastEvent.type() == SOURCES_ADDED ||
133 mcastEvent.type() == SOURCES_REMOVED) {
Pier3e793752018-04-19 16:47:06 +0200134 // Current subject and prev just differ for the source connect points
135 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pierb0328e42018-03-27 11:29:42 -0700136 } else if (mcastEvent.type() == SINKS_ADDED) {
Pierb0328e42018-03-27 11:29:42 -0700137 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
138 // Get the previous locations and verify if there are changes
139 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
140 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
141 prevConnectPoints : Collections.emptySet());
142 if (!changes.isEmpty()) {
143 sinksBuilder.add(hostId);
Pier3ee24552018-03-14 16:47:32 -0700144 }
Pierb0328e42018-03-27 11:29:42 -0700145 }));
146 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pierb0328e42018-03-27 11:29:42 -0700147 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
148 // Get the current locations and verify if there are changes
149 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
150 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
151 currentConnectPoints : Collections.emptySet());
152 if (!changes.isEmpty()) {
153 sinksBuilder.add(hostId);
154 }
155 }));
156 } else if (mcastEvent.type() == ROUTE_REMOVED) {
157 // Current subject is null, just take the previous host ids
158 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigi05514fd2018-02-28 17:24:03 +0100159 }
Pier Luigi05514fd2018-02-28 17:24:03 +0100160 sinksBuilder.build().forEach(sink -> {
Pier3ee24552018-03-14 16:47:32 -0700161 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigi05514fd2018-02-28 17:24:03 +0100162 mcastEventCache.put(cacheKey, mcastEvent);
163 });
164 }
165
166 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pierb0328e42018-03-27 11:29:42 -0700167 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
168 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pierb0328e42018-03-27 11:29:42 -0700169 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pierb0328e42018-03-27 11:29:42 -0700170 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Pier3e793752018-04-19 16:47:06 +0200171 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
172 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
173 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
174 Set<ConnectPoint> sources;
Pier Luigi05514fd2018-02-28 17:24:03 +0100175 switch (mcastEvent.type()) {
Pier3ee24552018-03-14 16:47:32 -0700176 case SOURCES_ADDED:
Pier3e793752018-04-19 16:47:06 +0200177 sources = mcastUpdate.sources()
178 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
179 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
180 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigi05514fd2018-02-28 17:24:03 +0100181 break;
Pier3ee24552018-03-14 16:47:32 -0700182 case SOURCES_REMOVED:
Pier3e793752018-04-19 16:47:06 +0200183 sources = mcastUpdate.sources()
184 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
185 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
186 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigi05514fd2018-02-28 17:24:03 +0100187 break;
188 case ROUTE_REMOVED:
Pier3e793752018-04-19 16:47:06 +0200189 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigi05514fd2018-02-28 17:24:03 +0100190 break;
Pier3ee24552018-03-14 16:47:32 -0700191 case SINKS_ADDED:
Pier3e793752018-04-19 16:47:06 +0200192 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigi05514fd2018-02-28 17:24:03 +0100193 break;
Pier3ee24552018-03-14 16:47:32 -0700194 case SINKS_REMOVED:
Pier3e793752018-04-19 16:47:06 +0200195 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigi05514fd2018-02-28 17:24:03 +0100196 break;
197 default:
198 break;
199 }
200 }
201
Pier Luigib72201b2018-01-25 16:16:02 +0100202 // Mcast lock to serialize local operations
203 private final Lock mcastLock = new ReentrantLock();
Pier Luigib72201b2018-01-25 16:16:02 +0100204 private void mcastLock() {
205 mcastLock.lock();
206 }
Pier Luigib72201b2018-01-25 16:16:02 +0100207 private void mcastUnlock() {
208 mcastLock.unlock();
209 }
Pier Luigib72201b2018-01-25 16:16:02 +0100210 // Stability threshold for Mcast. Seconds
211 private static final long MCAST_STABLITY_THRESHOLD = 5;
212 // Last change done
213 private Instant lastMcastChange = Instant.now();
214
215 /**
216 * Determines if mcast in the network has been stable in the last
217 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
218 * to the last mcast change timestamp.
219 *
220 * @return true if stable
221 */
222 private boolean isMcastStable() {
223 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
224 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Dasa4020382018-02-14 14:14:54 -0800225 log.trace("Mcast stable since {}s", now - last);
Pier Luigib72201b2018-01-25 16:16:02 +0100226 return (now - last) > MCAST_STABLITY_THRESHOLD;
227 }
228
Pier3e793752018-04-19 16:47:06 +0200229 // Verify interval for Mcast bucket corrector
Pier Luigib72201b2018-01-25 16:16:02 +0100230 private static final long MCAST_VERIFY_INTERVAL = 30;
Pier3e793752018-04-19 16:47:06 +0200231 // Executor for mcast bucket corrector and for cache
Pier Luigib72201b2018-01-25 16:16:02 +0100232 private ScheduledExecutorService executorService
Pier Luigi05514fd2018-02-28 17:24:03 +0100233 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigib72201b2018-01-25 16:16:02 +0100234
Charles Chan2199c302016-04-23 17:36:10 -0700235 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700236 * Constructs the McastEventHandler.
237 *
238 * @param srManager Segment Routing manager
239 */
Charles Chand2990362016-04-18 13:44:03 -0700240 public McastHandler(SegmentRoutingManager srManager) {
Pierb0328e42018-03-27 11:29:42 -0700241 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chand55e84d2016-03-30 17:54:24 -0700242 this.srManager = srManager;
Charles Chand55e84d2016-03-30 17:54:24 -0700243 this.topologyService = srManager.topologyService;
Pierb0328e42018-03-27 11:29:42 -0700244 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chand55e84d2016-03-30 17:54:24 -0700245 .register(KryoNamespaces.API)
Pier3e793752018-04-19 16:47:06 +0200246 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pierb0328e42018-03-27 11:29:42 -0700247 mcastNextObjStore = srManager.storageService
Charles Chan2199c302016-04-23 17:36:10 -0700248 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chand55e84d2016-03-30 17:54:24 -0700249 .withName("onos-mcast-nextobj-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700250 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chand55e84d2016-03-30 17:54:24 -0700251 .build();
Pier3e793752018-04-19 16:47:06 +0200252 mcastKryo = new KryoNamespace.Builder()
253 .register(KryoNamespaces.API)
254 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
255 .register(McastRole.class);
Pierb0328e42018-03-27 11:29:42 -0700256 mcastRoleStore = srManager.storageService
Pier3e793752018-04-19 16:47:06 +0200257 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan2199c302016-04-23 17:36:10 -0700258 .withName("onos-mcast-role-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700259 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan2199c302016-04-23 17:36:10 -0700260 .build();
Pierb0328e42018-03-27 11:29:42 -0700261 mcastUtils = new McastUtils(srManager, coreAppId, log);
Pier3e793752018-04-19 16:47:06 +0200262 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigib72201b2018-01-25 16:16:02 +0100263 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pierb0328e42018-03-27 11:29:42 -0700264 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigi05514fd2018-02-28 17:24:03 +0100265 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
266 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan2199c302016-04-23 17:36:10 -0700267 }
268
269 /**
Pier3e793752018-04-19 16:47:06 +0200270 * Read initial multicast configuration from mcast store.
Charles Chan2199c302016-04-23 17:36:10 -0700271 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100272 public void init() {
Pierb0328e42018-03-27 11:29:42 -0700273 lastMcastChange = Instant.now();
274 mcastLock();
275 try {
276 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier3e793752018-04-19 16:47:06 +0200277 log.debug("Init group {}", mcastRoute.group());
Pier96f63cb2018-04-17 16:29:56 +0200278 if (!mcastUtils.isLeader(mcastRoute.group())) {
279 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
280 return;
281 }
Pierb0328e42018-03-27 11:29:42 -0700282 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Pier3e793752018-04-19 16:47:06 +0200283 // For each source process the mcast tree
284 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
285 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
286 Set<DeviceId> visited = Sets.newHashSet();
287 List<ConnectPoint> currentPath = Lists.newArrayList(source);
288 buildMcastPaths(source.deviceId(), visited, mcastPaths,
289 currentPath, mcastRoute.group(), source);
290 // Get all the sinks and process them
291 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
292 mcastRouteData.sinks());
293 // Filter out all the working sinks, we do not want to move them
294 // TODO we need a better way to distinguish flows coming from different sources
295 sinks = sinks.stream()
296 .filter(sink -> !mcastPaths.containsKey(sink) ||
297 !isSinkForSource(mcastRoute.group(), sink, source))
298 .collect(Collectors.toSet());
299 if (sinks.isEmpty()) {
300 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
301 return;
302 }
303 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
304 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
305 mcastRoute.group(), paths));
306 });
Pierb0328e42018-03-27 11:29:42 -0700307 });
308 } finally {
309 mcastUnlock();
310 }
Charles Chand55e84d2016-03-30 17:54:24 -0700311 }
312
313 /**
Pier Luigib72201b2018-01-25 16:16:02 +0100314 * Clean up when deactivating the application.
315 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100316 public void terminate() {
Pier477e0062018-04-20 14:14:34 +0200317 mcastEventCache.invalidateAll();
Pier Luigib72201b2018-01-25 16:16:02 +0100318 executorService.shutdown();
Pier477e0062018-04-20 14:14:34 +0200319 mcastNextObjStore.destroy();
320 mcastRoleStore.destroy();
321 mcastUtils.terminate();
322 log.info("Terminated");
Pier Luigib72201b2018-01-25 16:16:02 +0100323 }
324
325 /**
Pier Luigi05514fd2018-02-28 17:24:03 +0100326 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Pier3e793752018-04-19 16:47:06 +0200327 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chand55e84d2016-03-30 17:54:24 -0700328 *
329 * @param event McastEvent with SOURCE_ADDED type
330 */
Pier Luigi05514fd2018-02-28 17:24:03 +0100331 public void processMcastEvent(McastEvent event) {
332 log.info("process {}", event);
Pier96f63cb2018-04-17 16:29:56 +0200333 // If it is a route added, we do not enqueue
334 if (event.type() == ROUTE_ADDED) {
Pier96f63cb2018-04-17 16:29:56 +0200335 processRouteAddedInternal(event.subject().route().group());
336 } else {
Pier96f63cb2018-04-17 16:29:56 +0200337 enqueueMcastEvent(event);
338 }
Pier Luigi9930da52018-02-02 16:19:11 +0100339 }
340
341 /**
Pier3e793752018-04-19 16:47:06 +0200342 * Process the SOURCES_ADDED event.
343 *
344 * @param sources the sources connect point
345 * @param mcastIp the group address
346 * @param sinks the sinks connect points
347 */
348 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
349 Map<HostId, Set<ConnectPoint>> sinks) {
350 lastMcastChange = Instant.now();
351 mcastLock();
352 try {
353 log.debug("Processing sources added {} for group {}", sources, mcastIp);
354 if (!mcastUtils.isLeader(mcastIp)) {
355 log.debug("Skip {} due to lack of leadership", mcastIp);
356 return;
357 }
358 sources.forEach(source -> {
359 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
360 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinksToBeAdded);
361 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
362 });
363 } finally {
364 mcastUnlock();
365 }
366 }
367
368 /**
369 * Process the SOURCES_REMOVED event.
370 *
371 * @param sourcesToBeRemoved the source connect points to be removed
372 * @param remainingSources the remainig source connect points
373 * @param mcastIp the group address
374 * @param sinks the sinks connect points
375 */
376 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
377 Set<ConnectPoint> remainingSources,
378 IpAddress mcastIp,
379 Map<HostId, Set<ConnectPoint>> sinks) {
380 lastMcastChange = Instant.now();
381 mcastLock();
382 try {
383 log.debug("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
384 if (!mcastUtils.isLeader(mcastIp)) {
385 log.debug("Skip {} due to lack of leadership", mcastIp);
386 return;
387 }
388 if (remainingSources.isEmpty()) {
389 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
390 return;
391 }
392 // Skip offline devices
393 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
394 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
395 .collect(Collectors.toSet());
396 if (candidateSources.isEmpty()) {
397 log.debug("Skip {} due to empty sources to be removed", mcastIp);
398 return;
399 }
400 Set<Link> remainingLinks = Sets.newHashSet();
401 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
402 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
403 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
404 totalSources.addAll(remainingSources);
405 // Calculate all the links used by the sources
406 totalSources.forEach(source -> {
407 Set<ConnectPoint> currentSinks = sinks.values()
408 .stream().flatMap(Collection::stream)
409 .filter(sink -> isSinkForSource(mcastIp, sink, source))
410 .collect(Collectors.toSet());
411 candidateSinks.put(source, currentSinks);
412 currentSinks.forEach(currentSink -> {
413 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
414 mcastIp, null, source);
415 if (currentPath.isPresent()) {
416 if (!candidateSources.contains(source)) {
417 remainingLinks.addAll(currentPath.get().links());
418 } else {
419 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
420 }
421 }
422 });
423 });
424 // Clean transit links
425 candidateLinks.forEach((source, currentCandidateLinks) -> {
426 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
427 .immutableCopy();
428 if (!linksToBeRemoved.isEmpty()) {
429 currentCandidateLinks.forEach(link -> {
430 DeviceId srcLink = link.src().deviceId();
431 // Remove ports only on links to be removed
432 if (linksToBeRemoved.contains(link)) {
433 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
434 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
435 source : null));
436 }
437 // Remove role on the candidate links
438 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
439 });
440 }
441 });
442 // Clean ingress and egress
443 candidateSources.forEach(source -> {
444 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
445 currentSinks.forEach(currentSink -> {
446 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
447 source : null);
448 // Sinks co-located with the source
449 if (source.deviceId().equals(currentSink.deviceId())) {
450 if (source.port().equals(currentSink.port())) {
451 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
452 mcastIp, currentSink, source);
453 return;
454 }
455 // We need to check against the other sources and if it is
456 // necessary remove the port from the device - no overlap
457 Set<VlanId> otherVlans = remainingSources.stream()
458 // Only sources co-located and having this sink
459 .filter(remainingSource -> remainingSource.deviceId()
460 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
461 .contains(currentSink))
462 .map(remainingSource -> mcastUtils.assignedVlan(
463 remainingSource.deviceId().equals(currentSink.deviceId()) ?
464 remainingSource : null)).collect(Collectors.toSet());
465 if (!otherVlans.contains(assignedVlan)) {
466 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
467 mcastIp, assignedVlan);
468 }
469 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
470 source));
471 return;
472 }
473 Set<VlanId> otherVlans = remainingSources.stream()
474 .filter(remainingSource -> candidateSinks.get(remainingSource)
475 .contains(currentSink))
476 .map(remainingSource -> mcastUtils.assignedVlan(
477 remainingSource.deviceId().equals(currentSink.deviceId()) ?
478 remainingSource : null)).collect(Collectors.toSet());
479 // Sinks on other leaves
480 if (!otherVlans.contains(assignedVlan)) {
481 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
482 mcastIp, assignedVlan);
483 }
484 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
485 source));
486 });
487 });
488 } finally {
489 mcastUnlock();
490 }
491 }
492
493 /**
Pier96f63cb2018-04-17 16:29:56 +0200494 * Process the ROUTE_ADDED event.
Pier Luigi57d41792018-02-26 12:31:38 +0100495 *
Pier96f63cb2018-04-17 16:29:56 +0200496 * @param mcastIp the group address
Pier Luigi57d41792018-02-26 12:31:38 +0100497 */
Pier96f63cb2018-04-17 16:29:56 +0200498 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigi57d41792018-02-26 12:31:38 +0100499 lastMcastChange = Instant.now();
500 mcastLock();
501 try {
Pier96f63cb2018-04-17 16:29:56 +0200502 log.debug("Processing route added for group {}", mcastIp);
503 // Just elect a new leader
504 mcastUtils.isLeader(mcastIp);
Pier Luigi57d41792018-02-26 12:31:38 +0100505 } finally {
506 mcastUnlock();
507 }
508 }
509
510 /**
Pier Luigi9930da52018-02-02 16:19:11 +0100511 * Removes the entire mcast tree related to this group.
Pier3e793752018-04-19 16:47:06 +0200512 * @param sources the source connect points
Pier Luigi9930da52018-02-02 16:19:11 +0100513 * @param mcastIp multicast group IP address
514 */
Pier3e793752018-04-19 16:47:06 +0200515 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi9930da52018-02-02 16:19:11 +0100516 lastMcastChange = Instant.now();
517 mcastLock();
518 try {
Pier Luigi57d41792018-02-26 12:31:38 +0100519 log.debug("Processing route removed for group {}", mcastIp);
Pier96f63cb2018-04-17 16:29:56 +0200520 if (!mcastUtils.isLeader(mcastIp)) {
521 log.debug("Skip {} due to lack of leadership", mcastIp);
522 mcastUtils.withdrawLeader(mcastIp);
523 return;
524 }
Pier3e793752018-04-19 16:47:06 +0200525 sources.forEach(source -> {
526 // Find out the ingress, transit and egress device of the affected group
527 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
528 .stream().findFirst().orElse(null);
529 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
530 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
531 // If there are no egress and transit devices, sinks could be only on the ingress
532 if (!egressDevices.isEmpty()) {
533 egressDevices.forEach(deviceId -> {
534 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
535 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
536 });
537 }
538 if (!transitDevices.isEmpty()) {
539 transitDevices.forEach(deviceId -> {
540 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
541 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
542 });
543 }
544 if (ingressDevice != null) {
545 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
546 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
547 }
548 });
549 // Finally, withdraw the leadership
550 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi9930da52018-02-02 16:19:11 +0100551 } finally {
552 mcastUnlock();
553 }
554 }
555
Pierb0328e42018-03-27 11:29:42 -0700556 /**
557 * Process sinks to be removed.
558 *
Pier3e793752018-04-19 16:47:06 +0200559 * @param sources the source connect points
Pierb0328e42018-03-27 11:29:42 -0700560 * @param mcastIp the ip address of the group
561 * @param newSinks the new sinks to be processed
Pier3bb1f3f2018-04-17 15:50:43 +0200562 * @param prevSinks the previous sinks
Pierb0328e42018-03-27 11:29:42 -0700563 */
Pier3e793752018-04-19 16:47:06 +0200564 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pierb0328e42018-03-27 11:29:42 -0700565 Map<HostId, Set<ConnectPoint>> newSinks,
Pier3bb1f3f2018-04-17 15:50:43 +0200566 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pierb0328e42018-03-27 11:29:42 -0700567 lastMcastChange = Instant.now();
568 mcastLock();
Pierb0328e42018-03-27 11:29:42 -0700569 try {
Pier96f63cb2018-04-17 16:29:56 +0200570 if (!mcastUtils.isLeader(mcastIp)) {
571 log.debug("Skip {} due to lack of leadership", mcastIp);
572 return;
573 }
Pier3e793752018-04-19 16:47:06 +0200574 log.debug("Processing sinks removed for group {} and for sources {}",
575 mcastIp, sources);
576 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
577 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
578 sources.forEach(source -> {
579 // Save the path associated to the sinks to be removed
580 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
581 newSinks, source);
582 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
583 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
584 sink.deviceId(), mcastIp,
585 null, source)));
586 treesToBeRemoved.put(source, treeToBeRemoved);
587 // Recover the dual-homed sinks
588 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
589 prevSinks, source);
590 treesToBeAdded.put(source, sinksToBeRecovered);
591 });
592 // Remove the sinks taking into account the multiple sources and the original paths
593 treesToBeRemoved.forEach((source, tree) ->
594 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
595 // Add new sinks according to the recovery procedure
596 treesToBeAdded.forEach((source, sinks) ->
597 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pierb0328e42018-03-27 11:29:42 -0700598 } finally {
599 mcastUnlock();
Pierb0328e42018-03-27 11:29:42 -0700600 }
601 }
602
Pier Luigi9930da52018-02-02 16:19:11 +0100603 /**
Pier Luigib72201b2018-01-25 16:16:02 +0100604 * Removes a path from source to sink for given multicast group.
605 *
606 * @param source connect point of the multicast source
607 * @param sink connection point of the multicast sink
608 * @param mcastIp multicast group IP address
Pier3e793752018-04-19 16:47:06 +0200609 * @param mcastPath path associated to the sink
Pier Luigib72201b2018-01-25 16:16:02 +0100610 */
611 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Pier3e793752018-04-19 16:47:06 +0200612 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigib72201b2018-01-25 16:16:02 +0100613 lastMcastChange = Instant.now();
614 mcastLock();
615 try {
Pier3e793752018-04-19 16:47:06 +0200616 log.debug("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pierb0328e42018-03-27 11:29:42 -0700617 boolean isLast;
Pier Luigib72201b2018-01-25 16:16:02 +0100618 // When source and sink are on the same device
619 if (source.deviceId().equals(sink.deviceId())) {
620 // Source and sink are on even the same port. There must be something wrong.
621 if (source.port().equals(sink.port())) {
Pier3e793752018-04-19 16:47:06 +0200622 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigib72201b2018-01-25 16:16:02 +0100623 return;
624 }
Pierb0328e42018-03-27 11:29:42 -0700625 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigib87b8ab2018-03-02 12:53:37 +0100626 if (isLast) {
Pier3e793752018-04-19 16:47:06 +0200627 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigib87b8ab2018-03-02 12:53:37 +0100628 }
Pier Luigib72201b2018-01-25 16:16:02 +0100629 return;
630 }
Pier Luigib72201b2018-01-25 16:16:02 +0100631 // Process the egress device
Pierb0328e42018-03-27 11:29:42 -0700632 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigib72201b2018-01-25 16:16:02 +0100633 if (isLast) {
Pier3e793752018-04-19 16:47:06 +0200634 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigib72201b2018-01-25 16:16:02 +0100635 }
Pier Luigib72201b2018-01-25 16:16:02 +0100636 // If this is the last sink on the device, also update upstream
Pier Luigib72201b2018-01-25 16:16:02 +0100637 if (mcastPath.isPresent()) {
638 List<Link> links = Lists.newArrayList(mcastPath.get().links());
639 Collections.reverse(links);
640 for (Link link : links) {
641 if (isLast) {
Pier3e793752018-04-19 16:47:06 +0200642 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
643 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigib87b8ab2018-03-02 12:53:37 +0100644 if (isLast) {
Pier3e793752018-04-19 16:47:06 +0200645 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigib87b8ab2018-03-02 12:53:37 +0100646 }
Pier Luigib72201b2018-01-25 16:16:02 +0100647 }
Charles Chand55e84d2016-03-30 17:54:24 -0700648 }
649 }
Pier Luigib72201b2018-01-25 16:16:02 +0100650 } finally {
651 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700652 }
653 }
654
Pierb0328e42018-03-27 11:29:42 -0700655
656 /**
657 * Process sinks to be added.
658 *
Pier3e793752018-04-19 16:47:06 +0200659 * @param sources the source connect points
Pierb0328e42018-03-27 11:29:42 -0700660 * @param mcastIp the group IP
661 * @param newSinks the new sinks to be processed
662 * @param allPrevSinks all previous sinks
663 */
Pier3e793752018-04-19 16:47:06 +0200664 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pierb0328e42018-03-27 11:29:42 -0700665 Map<HostId, Set<ConnectPoint>> newSinks,
666 Set<ConnectPoint> allPrevSinks) {
667 lastMcastChange = Instant.now();
668 mcastLock();
669 try {
Pier96f63cb2018-04-17 16:29:56 +0200670 if (!mcastUtils.isLeader(mcastIp)) {
671 log.debug("Skip {} due to lack of leadership", mcastIp);
672 return;
673 }
Pier3e793752018-04-19 16:47:06 +0200674 log.debug("Processing sinks added for group {} and for sources {}", mcastIp, sources);
675 sources.forEach(source -> {
676 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
677 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
678 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
679 });
Pierb0328e42018-03-27 11:29:42 -0700680 } finally {
681 mcastUnlock();
682 }
683 }
684
Charles Chand55e84d2016-03-30 17:54:24 -0700685 /**
686 * Establishes a path from source to sink for given multicast group.
687 *
688 * @param source connect point of the multicast source
689 * @param sink connection point of the multicast sink
690 * @param mcastIp multicast group IP address
691 */
692 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pierb0328e42018-03-27 11:29:42 -0700693 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigib72201b2018-01-25 16:16:02 +0100694 lastMcastChange = Instant.now();
695 mcastLock();
696 try {
Pier3e793752018-04-19 16:47:06 +0200697 log.debug("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigib72201b2018-01-25 16:16:02 +0100698 // Process the ingress device
Pierb0328e42018-03-27 11:29:42 -0700699 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
Pier3e793752018-04-19 16:47:06 +0200700 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +0100701 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigib72201b2018-01-25 16:16:02 +0100702 if (source.port().equals(sink.port())) {
703 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
704 mcastIp, sink, source);
705 return;
706 }
Pierb0328e42018-03-27 11:29:42 -0700707 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier3e793752018-04-19 16:47:06 +0200708 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +0100709 return;
710 }
Pier Luigib72201b2018-01-25 16:16:02 +0100711 // Find a path. If present, create/update groups and flows for each hop
Pier3e793752018-04-19 16:47:06 +0200712 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigib72201b2018-01-25 16:16:02 +0100713 if (mcastPath.isPresent()) {
714 List<Link> links = mcastPath.get().links();
Pier37db3692018-03-12 15:00:54 -0700715 // Setup mcast role for ingress
Pier3e793752018-04-19 16:47:06 +0200716 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
717 // Setup properly the transit forwarding
Pier Luigib72201b2018-01-25 16:16:02 +0100718 links.forEach(link -> {
719 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pierb0328e42018-03-27 11:29:42 -0700720 mcastUtils.assignedVlan(link.src().deviceId()
721 .equals(source.deviceId()) ? source : null));
722 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
723 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigib72201b2018-01-25 16:16:02 +0100724 });
Pier37db3692018-03-12 15:00:54 -0700725 // Setup mcast role for the transit
726 links.stream()
727 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Pier3e793752018-04-19 16:47:06 +0200728 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
729 source), TRANSIT));
Pier Luigib72201b2018-01-25 16:16:02 +0100730 // Process the egress device
Pierb0328e42018-03-27 11:29:42 -0700731 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier37db3692018-03-12 15:00:54 -0700732 // Setup mcast role for egress
Pier3e793752018-04-19 16:47:06 +0200733 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +0100734 } else {
Pier3e793752018-04-19 16:47:06 +0200735 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigib72201b2018-01-25 16:16:02 +0100736 }
737 } finally {
738 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700739 }
740 }
741
742 /**
Charles Chan2199c302016-04-23 17:36:10 -0700743 * Processes the LINK_DOWN event.
744 *
745 * @param affectedLink Link that is going down
746 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100747 public void processLinkDown(Link affectedLink) {
Pier Luigib72201b2018-01-25 16:16:02 +0100748 lastMcastChange = Instant.now();
749 mcastLock();
750 try {
751 // Get groups affected by the link down event
752 getAffectedGroups(affectedLink).forEach(mcastIp -> {
Pier3e793752018-04-19 16:47:06 +0200753 log.debug("Processing link down {} for group {}", affectedLink, mcastIp);
754 recoverFailure(mcastIp, affectedLink);
Charles Chan2199c302016-04-23 17:36:10 -0700755 });
Pier Luigib72201b2018-01-25 16:16:02 +0100756 } finally {
757 mcastUnlock();
758 }
Charles Chan2199c302016-04-23 17:36:10 -0700759 }
760
761 /**
Pier Luigieba73a02018-01-16 10:47:50 +0100762 * Process the DEVICE_DOWN event.
763 *
764 * @param deviceDown device going down
765 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100766 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigib72201b2018-01-25 16:16:02 +0100767 lastMcastChange = Instant.now();
768 mcastLock();
769 try {
770 // Get the mcast groups affected by the device going down
771 getAffectedGroups(deviceDown).forEach(mcastIp -> {
Pier3e793752018-04-19 16:47:06 +0200772 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
773 recoverFailure(mcastIp, deviceDown);
Pier Luigib72201b2018-01-25 16:16:02 +0100774 });
775 } finally {
776 mcastUnlock();
777 }
Pier Luigieba73a02018-01-16 10:47:50 +0100778 }
779
780 /**
Pier3e793752018-04-19 16:47:06 +0200781 * General failure recovery procedure.
782 *
783 * @param mcastIp the group to recover
784 * @param failedElement the failed element
785 */
786 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
787 // TODO Optimize when the group editing is in place
788 if (!mcastUtils.isLeader(mcastIp)) {
789 log.debug("Skip {} due to lack of leadership", mcastIp);
790 return;
791 }
792 // Do not proceed if the sources of this group are missing
793 Set<ConnectPoint> sources = getSources(mcastIp);
794 if (sources.isEmpty()) {
795 log.warn("Missing sources for group {}", mcastIp);
796 return;
797 }
798 // Find out the ingress devices of the affected group
799 // If sinks are in other leafs, we have ingress, transit, egress, and source
800 // If sinks are in the same leaf, we have just ingress and source
801 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
802 if (ingressDevices.isEmpty()) {
803 log.warn("Missing ingress devices for group {}", ingressDevices, mcastIp);
804 return;
805 }
806 // For each tree, delete ingress-transit part
807 sources.forEach(source -> {
808 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
809 transitDevices.forEach(transitDevice -> {
810 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
811 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
812 });
813 });
814 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
815 // TODO Evaluate the possibility of building optimize trees between sources
816 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
817 sources.forEach(source -> {
818 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
819 DeviceId ingressDevice = ingressDevices.stream()
820 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
821 // Clean also the ingress
822 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
823 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
824 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
825 }
826 if (ingressDevice == null) {
827 log.warn("Skip failure recovery - " +
828 "Missing ingress for source {} and group {}", source, mcastIp);
829 return;
830 }
831 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
832 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
833 // We have to verify, if there are egresses without paths
834 mcastTree.forEach((egressDevice, paths) -> {
835 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
836 mcastIp, paths, source);
837 // No paths, we have to try with alternative location
838 if (!mcastPath.isPresent()) {
839 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
840 listSources = listSources == null ? Sets.newHashSet() : listSources;
841 listSources.add(source);
842 return listSources;
843 });
844 notRecoveredInternal.add(egressDevice);
845 }
846 });
847 // Fast path, we can recover all the locations
848 if (notRecoveredInternal.isEmpty()) {
849 mcastTree.forEach((egressDevice, paths) -> {
850 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
851 mcastIp, paths, source);
852 if (mcastPath.isPresent()) {
853 installPath(mcastIp, source, mcastPath.get());
854 }
855 });
856 } else {
857 // Let's try to recover using alternative locations
858 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
859 ingressDevice, source);
860 }
861 });
862 // Finally remove the egresses not recovered
863 notRecovered.forEach((egressDevice, listSources) -> {
864 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
865 if (Objects.equal(currentSources, listSources)) {
866 log.warn("Fail to recover egress device {} from {} failure {}",
867 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
868 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
869 }
870 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
871 });
872 }
873
874 /**
Pierb0328e42018-03-27 11:29:42 -0700875 * Try to recover sinks using alternate locations.
876 *
877 * @param egressDevices the original egress devices
878 * @param notRecovered the devices not recovered
879 * @param mcastIp the group address
880 * @param ingressDevice the ingress device
881 * @param source the source connect point
Pierb0328e42018-03-27 11:29:42 -0700882 */
883 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Pier3e793752018-04-19 16:47:06 +0200884 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
885 log.debug("Processing recover sinks for group {} and for source {}",
886 mcastIp, source);
Pierb0328e42018-03-27 11:29:42 -0700887 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pierb0328e42018-03-27 11:29:42 -0700888 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pierb0328e42018-03-27 11:29:42 -0700889 Set<ConnectPoint> totalSinks = Sets.newHashSet();
890 // Let's compute all the affected sinks and all the sinks
891 notRecovered.forEach(deviceId -> {
892 totalAffectedSinks.addAll(
Pier3e793752018-04-19 16:47:06 +0200893 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Pierb0328e42018-03-27 11:29:42 -0700894 .flatMap(Collection::stream)
895 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
896 .collect(Collectors.toSet())
897 );
898 totalSinks.addAll(
Pier3e793752018-04-19 16:47:06 +0200899 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
900 .flatMap(Collection::stream).collect(Collectors.toSet())
Pierb0328e42018-03-27 11:29:42 -0700901 );
902 });
Pierb0328e42018-03-27 11:29:42 -0700903 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Pier3e793752018-04-19 16:47:06 +0200904 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
905 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
906 newEgressDevices.addAll(recovered);
907 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
908 newEgressDevices = newEgressDevices.stream()
909 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
910 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevices);
Pierb0328e42018-03-27 11:29:42 -0700911 // if the source was originally in the new locations, add new sinks
Pier3e793752018-04-19 16:47:06 +0200912 if (copyNewEgressDevices.contains(ingressDevice)) {
Pierb0328e42018-03-27 11:29:42 -0700913 sinksToBeAdded.stream()
914 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
915 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
916 }
Pierb0328e42018-03-27 11:29:42 -0700917 // Construct a new path for each egress device
918 mcastTree.forEach((egressDevice, paths) -> {
Pier3e793752018-04-19 16:47:06 +0200919 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pierb0328e42018-03-27 11:29:42 -0700920 if (mcastPath.isPresent()) {
921 // Using recovery procedure
922 if (recovered.contains(egressDevice)) {
923 installPath(mcastIp, source, mcastPath.get());
924 } else {
925 // otherwise we need to threat as new sink
926 sinksToBeAdded.stream()
927 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
928 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
929 }
Pierb0328e42018-03-27 11:29:42 -0700930 }
931 });
Pierb0328e42018-03-27 11:29:42 -0700932 }
933
934 /**
Pier3bb1f3f2018-04-17 15:50:43 +0200935 * Process all the sinks related to a mcast group and return
936 * the ones to be removed.
937 *
938 * @param mcastIp the group address
939 * @param prevsinks the previous sinks to be evaluated
940 * @param newSinks the new sinks to be evaluted
Pier3e793752018-04-19 16:47:06 +0200941 * @param source the source connect point
Pier3bb1f3f2018-04-17 15:50:43 +0200942 * @return the set of the sinks to be removed
943 */
944 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
945 Map<HostId, Set<ConnectPoint>> prevsinks,
Pier3e793752018-04-19 16:47:06 +0200946 Map<HostId, Set<ConnectPoint>> newSinks,
947 ConnectPoint source) {
Pier3bb1f3f2018-04-17 15:50:43 +0200948 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
949 prevsinks.forEach(((hostId, connectPoints) -> {
950 // We have to check with the existing flows
951 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier3e793752018-04-19 16:47:06 +0200952 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier3bb1f3f2018-04-17 15:50:43 +0200953 .findFirst().orElse(null);
954 if (sinkToBeProcessed != null) {
955 // If the host has been removed or location has been removed
956 if (!newSinks.containsKey(hostId) ||
957 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
958 sinksToBeProcessed.add(sinkToBeProcessed);
959 }
960 }
961 }));
962 // We have done, return the set
963 return sinksToBeProcessed;
964 }
965
966 /**
Pierb0328e42018-03-27 11:29:42 -0700967 * Process new locations and return the set of sinks to be added
968 * in the context of the recovery.
969 *
Pier3bb1f3f2018-04-17 15:50:43 +0200970 * @param newSinks the remaining sinks
971 * @param prevSinks the previous sinks
Pier3e793752018-04-19 16:47:06 +0200972 * @param source the source connect point
Pierb0328e42018-03-27 11:29:42 -0700973 * @return the set of the sinks to be processed
974 */
975 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
Pier3bb1f3f2018-04-17 15:50:43 +0200976 Map<HostId, Set<ConnectPoint>> newSinks,
Pier3e793752018-04-19 16:47:06 +0200977 Map<HostId, Set<ConnectPoint>> prevSinks,
978 ConnectPoint source) {
Pierb0328e42018-03-27 11:29:42 -0700979 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier3bb1f3f2018-04-17 15:50:43 +0200980 newSinks.forEach((hostId, connectPoints) -> {
Pierb0328e42018-03-27 11:29:42 -0700981 // If it has more than 1 locations
982 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
983 log.debug("Skip {} since sink {} has {} locations",
984 mcastIp, hostId, connectPoints.size());
985 return;
986 }
Pier3bb1f3f2018-04-17 15:50:43 +0200987 // If previously it had two locations, we need to recover it
988 // Filter out if the remaining location is already served
989 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier06d0a772018-04-19 15:53:20 +0200990 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier3e793752018-04-19 16:47:06 +0200991 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier06d0a772018-04-19 15:53:20 +0200992 .findFirst().orElse(null);
993 if (sinkToBeProcessed != null) {
994 sinksToBeProcessed.add(sinkToBeProcessed);
995 }
Pier3bb1f3f2018-04-17 15:50:43 +0200996 }
Pierb0328e42018-03-27 11:29:42 -0700997 });
998 return sinksToBeProcessed;
999 }
1000
1001 /**
1002 * Process all the sinks related to a mcast group and return
1003 * the ones to be processed.
1004 *
1005 * @param source the source connect point
1006 * @param mcastIp the group address
1007 * @param sinks the sinks to be evaluated
1008 * @return the set of the sinks to be processed
1009 */
1010 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1011 Map<HostId, Set<ConnectPoint>> sinks) {
Pierb0328e42018-03-27 11:29:42 -07001012 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1013 sinks.forEach(((hostId, connectPoints) -> {
1014 // If it has more than 2 locations
1015 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1016 log.debug("Skip {} since sink {} has {} locations",
1017 mcastIp, hostId, connectPoints.size());
1018 return;
1019 }
1020 // If it has one location, just use it
1021 if (connectPoints.size() == 1) {
Pier3e793752018-04-19 16:47:06 +02001022 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pierb0328e42018-03-27 11:29:42 -07001023 return;
1024 }
1025 // We prefer to reuse existing flows
1026 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier3e793752018-04-19 16:47:06 +02001027 .filter(connectPoint -> {
1028 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1029 return false;
1030 }
1031 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1032 return false;
1033 }
1034 ConnectPoint other = connectPoints.stream()
1035 .filter(remaining -> !remaining.equals(connectPoint))
1036 .findFirst().orElse(null);
1037 // We are already serving the sink
1038 return !isSinkForSource(mcastIp, other, source);
1039 }).findFirst().orElse(null);
1040
Pierb0328e42018-03-27 11:29:42 -07001041 if (sinkToBeProcessed != null) {
1042 sinksToBeProcessed.add(sinkToBeProcessed);
1043 return;
1044 }
1045 // Otherwise we prefer to reuse existing egresses
Pier3e793752018-04-19 16:47:06 +02001046 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pierb0328e42018-03-27 11:29:42 -07001047 sinkToBeProcessed = connectPoints.stream()
Pier3e793752018-04-19 16:47:06 +02001048 .filter(connectPoint -> {
1049 if (!egresses.contains(connectPoint.deviceId())) {
1050 return false;
1051 }
1052 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1053 return false;
1054 }
1055 ConnectPoint other = connectPoints.stream()
1056 .filter(remaining -> !remaining.equals(connectPoint))
1057 .findFirst().orElse(null);
1058 return !isSinkForSource(mcastIp, other, source);
1059 }).findFirst().orElse(null);
Pierb0328e42018-03-27 11:29:42 -07001060 if (sinkToBeProcessed != null) {
1061 sinksToBeProcessed.add(sinkToBeProcessed);
1062 return;
1063 }
1064 // Otherwise we prefer a location co-located with the source (if it exists)
1065 sinkToBeProcessed = connectPoints.stream()
1066 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1067 .findFirst().orElse(null);
1068 if (sinkToBeProcessed != null) {
1069 sinksToBeProcessed.add(sinkToBeProcessed);
1070 return;
1071 }
Pier3e793752018-04-19 16:47:06 +02001072 // Finally, we randomly pick a new location if it is reachable
1073 sinkToBeProcessed = connectPoints.stream()
1074 .filter(connectPoint -> {
1075 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1076 return false;
1077 }
1078 ConnectPoint other = connectPoints.stream()
1079 .filter(remaining -> !remaining.equals(connectPoint))
1080 .findFirst().orElse(null);
1081 return !isSinkForSource(mcastIp, other, source);
1082 }).findFirst().orElse(null);
1083 if (sinkToBeProcessed != null) {
1084 sinksToBeProcessed.add(sinkToBeProcessed);
1085 }
Pierb0328e42018-03-27 11:29:42 -07001086 }));
Pierb0328e42018-03-27 11:29:42 -07001087 return sinksToBeProcessed;
1088 }
1089
1090 /**
Pier37db3692018-03-12 15:00:54 -07001091 * Utility method to remove all the ingress transit ports.
1092 *
1093 * @param mcastIp the group ip
Pier3e793752018-04-19 16:47:06 +02001094 * @param ingressDevices the ingress devices
1095 * @param sources the source connect points
Pier37db3692018-03-12 15:00:54 -07001096 */
Pier3e793752018-04-19 16:47:06 +02001097 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1098 Set<ConnectPoint> sources) {
1099 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1100 sources.forEach(source -> {
1101 DeviceId ingressDevice = ingressDevices.stream()
1102 .filter(deviceId -> deviceId.equals(source.deviceId()))
1103 .findFirst().orElse(null);
1104 if (ingressDevice == null) {
1105 log.warn("Skip removeIngressTransitPorts - " +
1106 "Missing ingress for source {} and group {}",
1107 source, mcastIp);
1108 return;
Pier37db3692018-03-12 15:00:54 -07001109 }
Pier3e793752018-04-19 16:47:06 +02001110 ingressTransitPorts.put(source, ingressTransitPort(mcastIp, ingressDevice, source));
Pier37db3692018-03-12 15:00:54 -07001111 });
Pier3e793752018-04-19 16:47:06 +02001112 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1113 DeviceId ingressDevice = ingressDevices.stream()
1114 .filter(deviceId -> deviceId.equals(source.deviceId()))
1115 .findFirst().orElse(null);
1116 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1117 mcastIp, mcastUtils.assignedVlan(source));
1118 if (isLast) {
1119 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1120 }
1121 }));
Pier37db3692018-03-12 15:00:54 -07001122 }
1123
1124 /**
Charles Chand55e84d2016-03-30 17:54:24 -07001125 * Adds a port to given multicast group on given device. This involves the
1126 * update of L3 multicast group and multicast routing table entry.
1127 *
1128 * @param deviceId device ID
1129 * @param port port to be added
1130 * @param mcastIp multicast group
1131 * @param assignedVlan assigned VLAN ID
1132 */
1133 private void addPortToDevice(DeviceId deviceId, PortNumber port,
Pierb0328e42018-03-27 11:29:42 -07001134 IpAddress mcastIp, VlanId assignedVlan) {
Pier3e793752018-04-19 16:47:06 +02001135 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chand55e84d2016-03-30 17:54:24 -07001136 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi21fffd22018-01-19 10:24:53 +01001137 NextObjective newNextObj;
Charles Chan2199c302016-04-23 17:36:10 -07001138 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -07001139 // First time someone request this mcast group via this device
1140 portBuilder.add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +01001141 // New nextObj
Pierb0328e42018-03-27 11:29:42 -07001142 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001143 portBuilder.build(), null).add();
1144 // Store the new port
1145 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001146 } else {
1147 // This device already serves some subscribers of this mcast group
Charles Chan2199c302016-04-23 17:36:10 -07001148 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -07001149 // Stop if the port is already in the nextobj
Pierb0328e42018-03-27 11:29:42 -07001150 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chand55e84d2016-03-30 17:54:24 -07001151 if (existingPorts.contains(port)) {
1152 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1153 return;
1154 }
Pier Luigi21fffd22018-01-19 10:24:53 +01001155 // Let's add the port and reuse the previous one
Yuta HIGUCHI0eb68e12018-02-09 18:05:23 -08001156 portBuilder.addAll(existingPorts).add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +01001157 // Reuse previous nextObj
Pierb0328e42018-03-27 11:29:42 -07001158 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001159 portBuilder.build(), nextObj.id()).addToExisting();
1160 // Store the final next objective and send only the difference to the driver
1161 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1162 // Add just the new port
1163 portBuilder = ImmutableSet.builder();
1164 portBuilder.add(port);
Pierb0328e42018-03-27 11:29:42 -07001165 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001166 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chand55e84d2016-03-30 17:54:24 -07001167 }
1168 // Create, store and apply the new nextObj and fwdObj
Charles Chan2199c302016-04-23 17:36:10 -07001169 ObjectiveContext context = new DefaultObjectiveContext(
1170 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1171 mcastIp, deviceId, port.toLong(), assignedVlan),
Pier3e793752018-04-19 16:47:06 +02001172 (objective, error) -> log.warn("Failed to add {} on {}/{}, vlan {}: {}",
Charles Chan2199c302016-04-23 17:36:10 -07001173 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001174 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1175 newNextObj.id()).add(context);
Charles Chand55e84d2016-03-30 17:54:24 -07001176 srManager.flowObjectiveService.next(deviceId, newNextObj);
1177 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001178 }
1179
1180 /**
1181 * Removes a port from given multicast group on given device.
1182 * This involves the update of L3 multicast group and multicast routing
1183 * table entry.
1184 *
1185 * @param deviceId device ID
1186 * @param port port to be added
1187 * @param mcastIp multicast group
1188 * @param assignedVlan assigned VLAN ID
1189 * @return true if this is the last sink on this device
1190 */
1191 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
Pierb0328e42018-03-27 11:29:42 -07001192 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -07001193 McastStoreKey mcastStoreKey =
Pier3e793752018-04-19 16:47:06 +02001194 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chand55e84d2016-03-30 17:54:24 -07001195 // This device is not serving this multicast group
Charles Chan2199c302016-04-23 17:36:10 -07001196 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Pier3e793752018-04-19 16:47:06 +02001197 return true;
Charles Chand55e84d2016-03-30 17:54:24 -07001198 }
Charles Chan2199c302016-04-23 17:36:10 -07001199 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pierb0328e42018-03-27 11:29:42 -07001200 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan2199c302016-04-23 17:36:10 -07001201 // This port does not serve this multicast group
Charles Chand55e84d2016-03-30 17:54:24 -07001202 if (!existingPorts.contains(port)) {
Pier3e793752018-04-19 16:47:06 +02001203 if (!existingPorts.isEmpty()) {
1204 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1205 return false;
1206 }
1207 return true;
Charles Chand55e84d2016-03-30 17:54:24 -07001208 }
1209 // Copy and modify the ImmutableSet
1210 existingPorts = Sets.newHashSet(existingPorts);
1211 existingPorts.remove(port);
Charles Chand55e84d2016-03-30 17:54:24 -07001212 NextObjective newNextObj;
Pier Luigid1be7b12018-01-19 10:24:53 +01001213 ObjectiveContext context;
Charles Chand55e84d2016-03-30 17:54:24 -07001214 ForwardingObjective fwdObj;
1215 if (existingPorts.isEmpty()) {
Pier Luigid1be7b12018-01-19 10:24:53 +01001216 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -07001217 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1218 mcastIp, deviceId, port.toLong(), assignedVlan),
Pier3e793752018-04-19 16:47:06 +02001219 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan2199c302016-04-23 17:36:10 -07001220 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001221 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan2199c302016-04-23 17:36:10 -07001222 mcastNextObjStore.remove(mcastStoreKey);
Charles Chand55e84d2016-03-30 17:54:24 -07001223 } else {
1224 // If this is not the last sink, update flows and groups
Pier Luigid1be7b12018-01-19 10:24:53 +01001225 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -07001226 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1227 mcastIp, deviceId, port.toLong(), assignedVlan),
Pier3e793752018-04-19 16:47:06 +02001228 (objective, error) -> log.warn("Failed to update {} on {}/{}, vlan {}: {}",
Charles Chan2199c302016-04-23 17:36:10 -07001229 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigid1be7b12018-01-19 10:24:53 +01001230 // Here we store the next objective with the remaining port
Pierb0328e42018-03-27 11:29:42 -07001231 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigid1be7b12018-01-19 10:24:53 +01001232 existingPorts, nextObj.id()).removeFromExisting();
Pierb0328e42018-03-27 11:29:42 -07001233 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan2199c302016-04-23 17:36:10 -07001234 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001235 }
Pier Luigid1be7b12018-01-19 10:24:53 +01001236 // Let's modify the next objective removing the bucket
Pierb0328e42018-03-27 11:29:42 -07001237 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigid1be7b12018-01-19 10:24:53 +01001238 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1239 srManager.flowObjectiveService.next(deviceId, newNextObj);
1240 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001241 return existingPorts.isEmpty();
1242 }
1243
Charles Chan2199c302016-04-23 17:36:10 -07001244 /**
1245 * Removes entire group on given device.
1246 *
1247 * @param deviceId device ID
1248 * @param mcastIp multicast group to be removed
1249 * @param assignedVlan assigned VLAN ID
1250 */
1251 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
Pierb0328e42018-03-27 11:29:42 -07001252 VlanId assignedVlan) {
Pier3e793752018-04-19 16:47:06 +02001253 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan2199c302016-04-23 17:36:10 -07001254 // This device is not serving this multicast group
1255 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1256 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1257 return;
1258 }
1259 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan2199c302016-04-23 17:36:10 -07001260 ObjectiveContext context = new DefaultObjectiveContext(
1261 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1262 mcastIp, deviceId, assignedVlan),
Pier3e793752018-04-19 16:47:06 +02001263 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan2199c302016-04-23 17:36:10 -07001264 mcastIp, deviceId, assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001265 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan2199c302016-04-23 17:36:10 -07001266 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1267 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan2199c302016-04-23 17:36:10 -07001268 }
1269
Pier Luigieba73a02018-01-16 10:47:50 +01001270 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigieba73a02018-01-16 10:47:50 +01001271 List<Link> links = mcastPath.links();
Pier37db3692018-03-12 15:00:54 -07001272 // Setup new ingress mcast role
Pier3e793752018-04-19 16:47:06 +02001273 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier37db3692018-03-12 15:00:54 -07001274 INGRESS);
Pier Luigieba73a02018-01-16 10:47:50 +01001275 // For each link, modify the next on the source device adding the src port
1276 // and a new filter objective on the destination port
1277 links.forEach(link -> {
1278 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pierb0328e42018-03-27 11:29:42 -07001279 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1280 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1281 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigieba73a02018-01-16 10:47:50 +01001282 });
Pier37db3692018-03-12 15:00:54 -07001283 // Setup mcast role for the transit
1284 links.stream()
1285 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Pier3e793752018-04-19 16:47:06 +02001286 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier37db3692018-03-12 15:00:54 -07001287 TRANSIT));
Charles Chan2199c302016-04-23 17:36:10 -07001288 }
1289
Charles Chand55e84d2016-03-30 17:54:24 -07001290 /**
Pier3ee24552018-03-14 16:47:32 -07001291 * Go through all the paths, looking for shared links to be used
1292 * in the final path computation.
1293 *
1294 * @param egresses egress devices
1295 * @param availablePaths all the available paths towards the egress
1296 * @return shared links between egress devices
1297 */
1298 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1299 Map<DeviceId, List<Path>> availablePaths) {
Pier3ee24552018-03-14 16:47:32 -07001300 int minLength = Integer.MAX_VALUE;
1301 int length;
Pier3ee24552018-03-14 16:47:32 -07001302 List<Path> currentPaths;
1303 // Verify the source can still reach all the egresses
1304 for (DeviceId egress : egresses) {
1305 // From the source we cannot reach all the sinks
Pierb0328e42018-03-27 11:29:42 -07001306 // just continue and let's figure out after
Pier3ee24552018-03-14 16:47:32 -07001307 currentPaths = availablePaths.get(egress);
1308 if (currentPaths.isEmpty()) {
1309 continue;
1310 }
Pier3e793752018-04-19 16:47:06 +02001311 // Get the length of the first one available, update the min length
Pier3ee24552018-03-14 16:47:32 -07001312 length = currentPaths.get(0).links().size();
1313 if (length < minLength) {
1314 minLength = length;
1315 }
Pier Luigif7049c52018-02-23 19:57:40 +01001316 }
Pier3ee24552018-03-14 16:47:32 -07001317 // If there are no paths
1318 if (minLength == Integer.MAX_VALUE) {
1319 return Collections.emptySet();
1320 }
Pier3ee24552018-03-14 16:47:32 -07001321 int index = 0;
Pier3ee24552018-03-14 16:47:32 -07001322 Set<Link> sharedLinks = Sets.newHashSet();
1323 Set<Link> currentSharedLinks;
1324 Set<Link> currentLinks;
Pierb0328e42018-03-27 11:29:42 -07001325 DeviceId egressToRemove = null;
Pier3ee24552018-03-14 16:47:32 -07001326 // Let's find out the shared links
1327 while (index < minLength) {
1328 // Initialize the intersection with the paths related to the first egress
Pier3e793752018-04-19 16:47:06 +02001329 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier3ee24552018-03-14 16:47:32 -07001330 currentSharedLinks = Sets.newHashSet();
1331 // Iterate over the paths and take the "index" links
1332 for (Path path : currentPaths) {
1333 currentSharedLinks.add(path.links().get(index));
1334 }
1335 // Iterate over the remaining egress
1336 for (DeviceId egress : egresses) {
1337 // Iterate over the paths and take the "index" links
1338 currentLinks = Sets.newHashSet();
1339 for (Path path : availablePaths.get(egress)) {
1340 currentLinks.add(path.links().get(index));
1341 }
1342 // Do intersection
1343 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1344 // If there are no shared paths exit and record the device to remove
1345 // we have to retry with a subset of sinks
1346 if (currentSharedLinks.isEmpty()) {
Pierb0328e42018-03-27 11:29:42 -07001347 egressToRemove = egress;
Pier3ee24552018-03-14 16:47:32 -07001348 index = minLength;
1349 break;
1350 }
1351 }
1352 sharedLinks.addAll(currentSharedLinks);
1353 index++;
1354 }
Pier3e793752018-04-19 16:47:06 +02001355 // If the shared links is empty and there are egress let's retry another time with less sinks,
1356 // we can still build optimal subtrees
Pierb0328e42018-03-27 11:29:42 -07001357 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1358 egresses.remove(egressToRemove);
Pier3ee24552018-03-14 16:47:32 -07001359 sharedLinks = exploreMcastTree(egresses, availablePaths);
1360 }
1361 return sharedLinks;
1362 }
1363
1364 /**
1365 * Build Mcast tree having as root the given source and as leaves the given egress points.
1366 *
1367 * @param source source of the tree
1368 * @param sinks leaves of the tree
1369 * @return the computed Mcast tree
1370 */
1371 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
Pierb0328e42018-03-27 11:29:42 -07001372 Set<ConnectPoint> sinks) {
Pier3ee24552018-03-14 16:47:32 -07001373 // Get the egress devices, remove source from the egress if present
Pier3e793752018-04-19 16:47:06 +02001374 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1375 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
Pier3ee24552018-03-14 16:47:32 -07001376 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier3ee24552018-03-14 16:47:32 -07001377 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pierb0328e42018-03-27 11:29:42 -07001378 // We need to put back the source if it was originally present
1379 sinks.forEach(sink -> {
1380 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1381 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1382 });
Pier3ee24552018-03-14 16:47:32 -07001383 return finalTree;
1384 }
1385
1386 /**
1387 * Build Mcast tree having as root the given source and as leaves the given egress.
1388 *
1389 * @param source source of the tree
1390 * @param egresses leaves of the tree
1391 * @return the computed Mcast tree
1392 */
1393 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1394 Set<DeviceId> egresses) {
1395 // Pre-compute all the paths
1396 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier3ee24552018-03-14 16:47:32 -07001397 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1398 Collections.emptySet())));
1399 // Explore the topology looking for shared links amongst the egresses
1400 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier3ee24552018-03-14 16:47:32 -07001401 // Build the final paths enforcing the shared links between egress devices
Pier3e793752018-04-19 16:47:06 +02001402 availablePaths.clear();
Pier3ee24552018-03-14 16:47:32 -07001403 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1404 linksToEnforce)));
1405 return availablePaths;
1406 }
1407
1408 /**
1409 * Gets path from src to dst computed using the custom link weigher.
1410 *
1411 * @param src source device ID
1412 * @param dst destination device ID
1413 * @return list of paths from src to dst
1414 */
1415 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier3ee24552018-03-14 16:47:32 -07001416 final Topology currentTopology = topologyService.currentTopology();
Pier3ee24552018-03-14 16:47:32 -07001417 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Pier3e793752018-04-19 16:47:06 +02001418 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
Pier3ee24552018-03-14 16:47:32 -07001419 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1420 return allPaths;
Pier Luigif7049c52018-02-23 19:57:40 +01001421 }
1422
Charles Chand55e84d2016-03-30 17:54:24 -07001423 /**
1424 * Gets a path from src to dst.
1425 * If a path was allocated before, returns the allocated path.
1426 * Otherwise, randomly pick one from available paths.
1427 *
1428 * @param src source device ID
1429 * @param dst destination device ID
1430 * @param mcastIp multicast group
Pier3ee24552018-03-14 16:47:32 -07001431 * @param allPaths paths list
Charles Chand55e84d2016-03-30 17:54:24 -07001432 * @return an optional path from src to dst
1433 */
Pier3e793752018-04-19 16:47:06 +02001434 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1435 List<Path> allPaths, ConnectPoint source) {
Pier3ee24552018-03-14 16:47:32 -07001436 if (allPaths == null) {
1437 allPaths = getPaths(src, dst, Collections.emptySet());
1438 }
Charles Chand55e84d2016-03-30 17:54:24 -07001439 if (allPaths.isEmpty()) {
Charles Chand55e84d2016-03-30 17:54:24 -07001440 return Optional.empty();
1441 }
Pier3e793752018-04-19 16:47:06 +02001442 // Create a map index of suitability-to-list of paths. For example
Pier Luigibad6d6c2018-01-23 16:06:38 +01001443 // a path in the list associated to the index 1 shares only the
1444 // first hop and it is less suitable of a path belonging to the index
1445 // 2 that shares leaf-spine.
1446 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigibad6d6c2018-01-23 16:06:38 +01001447 int nhop;
1448 McastStoreKey mcastStoreKey;
Pier Luigibad6d6c2018-01-23 16:06:38 +01001449 PortNumber srcPort;
1450 Set<PortNumber> existingPorts;
1451 NextObjective nextObj;
Pier Luigibad6d6c2018-01-23 16:06:38 +01001452 for (Path path : allPaths) {
Pier Luigibad6d6c2018-01-23 16:06:38 +01001453 if (!src.equals(path.links().get(0).src().deviceId())) {
1454 continue;
1455 }
1456 nhop = 0;
1457 // Iterate over the links
Pier3e793752018-04-19 16:47:06 +02001458 for (Link hop : path.links()) {
1459 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1460 source : null);
1461 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1462 // It does not exist in the store, go to the next link
Pier Luigibad6d6c2018-01-23 16:06:38 +01001463 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Pier3e793752018-04-19 16:47:06 +02001464 continue;
Charles Chand55e84d2016-03-30 17:54:24 -07001465 }
Pier Luigibad6d6c2018-01-23 16:06:38 +01001466 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pierb0328e42018-03-27 11:29:42 -07001467 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigibad6d6c2018-01-23 16:06:38 +01001468 srcPort = hop.src().port();
Pier3e793752018-04-19 16:47:06 +02001469 // the src port is not used as output, go to the next link
Pier Luigibad6d6c2018-01-23 16:06:38 +01001470 if (!existingPorts.contains(srcPort)) {
Pier3e793752018-04-19 16:47:06 +02001471 continue;
Pier Luigibad6d6c2018-01-23 16:06:38 +01001472 }
1473 nhop++;
1474 }
1475 // n_hop defines the index
1476 if (nhop > 0) {
1477 eligiblePaths.compute(nhop, (index, paths) -> {
1478 paths = paths == null ? Lists.newArrayList() : paths;
1479 paths.add(path);
1480 return paths;
1481 });
Charles Chand55e84d2016-03-30 17:54:24 -07001482 }
1483 }
Pier Luigibad6d6c2018-01-23 16:06:38 +01001484 if (eligiblePaths.isEmpty()) {
1485 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigibad6d6c2018-01-23 16:06:38 +01001486 Collections.shuffle(allPaths);
1487 return allPaths.stream().findFirst();
1488 }
Pier Luigibad6d6c2018-01-23 16:06:38 +01001489 // Let's take the best ones
Pier3e793752018-04-19 16:47:06 +02001490 Integer bestIndex = eligiblePaths.keySet().stream()
1491 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigibad6d6c2018-01-23 16:06:38 +01001492 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1493 log.debug("{} eligiblePath(s) found from {} to {}",
1494 bestPaths.size(), src, dst);
Pier Luigibad6d6c2018-01-23 16:06:38 +01001495 Collections.shuffle(bestPaths);
1496 return bestPaths.stream().findFirst();
Charles Chand55e84d2016-03-30 17:54:24 -07001497 }
1498
1499 /**
Pier3e793752018-04-19 16:47:06 +02001500 * Gets device(s) of given role and of given source in given multicast tree.
1501 *
1502 * @param mcastIp multicast IP
1503 * @param role multicast role
1504 * @param source source connect point
1505 * @return set of device ID or empty set if not found
1506 */
1507 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1508 return mcastRoleStore.entrySet().stream()
1509 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1510 entry.getKey().source().equals(source) &&
1511 entry.getValue().value() == role)
1512 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1513 }
1514
1515 /**
Charles Chan2199c302016-04-23 17:36:10 -07001516 * Gets device(s) of given role in given multicast group.
1517 *
1518 * @param mcastIp multicast IP
1519 * @param role multicast role
1520 * @return set of device ID or empty set if not found
1521 */
1522 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1523 return mcastRoleStore.entrySet().stream()
1524 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1525 entry.getValue().value() == role)
Pier3e793752018-04-19 16:47:06 +02001526 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1527 }
1528
1529 /**
1530 * Gets source(s) of given role, given device in given multicast group.
1531 *
1532 * @param mcastIp multicast IP
1533 * @param deviceId device id
1534 * @param role multicast role
1535 * @return set of device ID or empty set if not found
1536 */
1537 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1538 return mcastRoleStore.entrySet().stream()
1539 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1540 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1541 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1542 }
1543
1544 /**
1545 * Gets source(s) of given multicast group.
1546 *
1547 * @param mcastIp multicast IP
1548 * @return set of device ID or empty set if not found
1549 */
1550 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1551 return mcastRoleStore.entrySet().stream()
1552 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1553 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan2199c302016-04-23 17:36:10 -07001554 }
1555
1556 /**
1557 * Gets groups which is affected by the link down event.
1558 *
1559 * @param link link going down
1560 * @return a set of multicast IpAddress
1561 */
1562 private Set<IpAddress> getAffectedGroups(Link link) {
1563 DeviceId deviceId = link.src().deviceId();
1564 PortNumber port = link.src().port();
1565 return mcastNextObjStore.entrySet().stream()
1566 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Pier3e793752018-04-19 16:47:06 +02001567 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1568 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan2199c302016-04-23 17:36:10 -07001569 }
1570
1571 /**
Pier Luigieba73a02018-01-16 10:47:50 +01001572 * Gets groups which are affected by the device down event.
1573 *
1574 * @param deviceId device going down
1575 * @return a set of multicast IpAddress
1576 */
1577 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1578 return mcastNextObjStore.entrySet().stream()
1579 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier3ee24552018-03-14 16:47:32 -07001580 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigieba73a02018-01-16 10:47:50 +01001581 .collect(Collectors.toSet());
1582 }
1583
1584 /**
Charles Chan2199c302016-04-23 17:36:10 -07001585 * Gets the spine-facing port on ingress device of given multicast group.
1586 *
1587 * @param mcastIp multicast IP
Pier3e793752018-04-19 16:47:06 +02001588 * @param ingressDevice the ingress device
1589 * @param source the source connect point
Charles Chan2199c302016-04-23 17:36:10 -07001590 * @return spine-facing port on ingress device
1591 */
Pier3e793752018-04-19 16:47:06 +02001592 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1593 ConnectPoint source) {
Pier37db3692018-03-12 15:00:54 -07001594 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan2199c302016-04-23 17:36:10 -07001595 if (ingressDevice != null) {
Pier3e793752018-04-19 16:47:06 +02001596 NextObjective nextObj = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1597 mcastUtils.assignedVlan(source))).value();
Pierb0328e42018-03-27 11:29:42 -07001598 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier37db3692018-03-12 15:00:54 -07001599 // Let's find out all the ingress-transit ports
Charles Chan2199c302016-04-23 17:36:10 -07001600 for (PortNumber port : ports) {
1601 // Spine-facing port should have no subnet and no xconnect
Pier Luigi96fe0772018-02-28 12:10:50 +01001602 if (srManager.deviceConfiguration() != null &&
1603 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chanfc5c7802016-05-17 13:13:55 -07001604 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier37db3692018-03-12 15:00:54 -07001605 portBuilder.add(port);
Charles Chan2199c302016-04-23 17:36:10 -07001606 }
1607 }
1608 }
Pier37db3692018-03-12 15:00:54 -07001609 return portBuilder.build();
Charles Chan2199c302016-04-23 17:36:10 -07001610 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001611
1612 /**
Pier3bb1f3f2018-04-17 15:50:43 +02001613 * Verify if a given connect point is sink for this group.
1614 *
1615 * @param mcastIp group address
1616 * @param connectPoint connect point to be verified
Pier3e793752018-04-19 16:47:06 +02001617 * @param source source connect point
Pier3bb1f3f2018-04-17 15:50:43 +02001618 * @return true if the connect point is sink of the group
1619 */
Pier3e793752018-04-19 16:47:06 +02001620 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1621 ConnectPoint source) {
1622 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1623 source : null);
1624 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier3bb1f3f2018-04-17 15:50:43 +02001625 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1626 return false;
1627 }
Pier3bb1f3f2018-04-17 15:50:43 +02001628 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1629 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1630 }
1631
1632 /**
Pier3e793752018-04-19 16:47:06 +02001633 * Verify if a given connect point is sink for this group and for this source.
1634 *
1635 * @param mcastIp group address
1636 * @param connectPoint connect point to be verified
1637 * @param source source connect point
1638 * @return true if the connect point is sink of the group
1639 */
1640 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1641 ConnectPoint source) {
1642 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1643 DeviceId device;
1644 if (connectPoint.deviceId().equals(source.deviceId())) {
1645 device = getDevice(mcastIp, INGRESS, source).stream()
1646 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1647 .findFirst().orElse(null);
1648 } else {
1649 device = getDevice(mcastIp, EGRESS, source).stream()
1650 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1651 .findFirst().orElse(null);
1652 }
1653 return isSink && device != null;
1654 }
1655
1656 /**
1657 * Verify if a sink is reachable from this source.
1658 *
1659 * @param mcastIp group address
1660 * @param sink connect point to be verified
1661 * @param source source connect point
1662 * @return true if the connect point is reachable from the source
1663 */
1664 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1665 ConnectPoint source) {
1666 return sink.deviceId().equals(source.deviceId()) ||
1667 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1668 }
1669
1670 /**
Pier Luigib72201b2018-01-25 16:16:02 +01001671 * Updates filtering objective for given device and port.
1672 * It is called in general when the mcast config has been
1673 * changed.
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001674 *
1675 * @param deviceId device ID
1676 * @param portNum ingress port number
1677 * @param vlanId assigned VLAN ID
1678 * @param install true to add, false to remove
1679 */
Pier Luigi96fe0772018-02-28 12:10:50 +01001680 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001681 VlanId vlanId, boolean install) {
Pier Luigib72201b2018-01-25 16:16:02 +01001682 lastMcastChange = Instant.now();
1683 mcastLock();
1684 try {
Pier3e793752018-04-19 16:47:06 +02001685 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigib72201b2018-01-25 16:16:02 +01001686 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier96f63cb2018-04-17 16:29:56 +02001687 log.debug("Update filter for {}", mcastRoute.group());
Pier96f63cb2018-04-17 16:29:56 +02001688 if (!mcastUtils.isLeader(mcastRoute.group())) {
1689 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1690 return;
1691 }
Pier3e793752018-04-19 16:47:06 +02001692 // Get the sources and for each one update properly the filtering objectives
1693 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1694 sources.forEach(source -> {
1695 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1696 if (install) {
1697 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
1698 } else {
1699 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1700 }
Pier Luigib72201b2018-01-25 16:16:02 +01001701 }
Pier3e793752018-04-19 16:47:06 +02001702 });
Pier Luigib72201b2018-01-25 16:16:02 +01001703 });
1704 } finally {
1705 mcastUnlock();
1706 }
1707 }
1708
1709 /**
1710 * Performs bucket verification operation for all mcast groups in the devices.
1711 * Firstly, it verifies that mcast is stable before trying verification operation.
1712 * Verification consists in creating new nexts with VERIFY operation. Actually,
1713 * the operation is totally delegated to the driver.
1714 */
Pier3e793752018-04-19 16:47:06 +02001715 private final class McastBucketCorrector implements Runnable {
Pier Luigib72201b2018-01-25 16:16:02 +01001716
1717 @Override
1718 public void run() {
Pier Luigib72201b2018-01-25 16:16:02 +01001719 if (!isMcastStable()) {
1720 return;
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001721 }
Pier Luigib72201b2018-01-25 16:16:02 +01001722 mcastLock();
1723 try {
1724 // Iterates over the routes and verify the related next objectives
1725 srManager.multicastRouteService.getRoutes()
Pier3e793752018-04-19 16:47:06 +02001726 .stream().map(McastRoute::group)
Pier Luigib72201b2018-01-25 16:16:02 +01001727 .forEach(mcastIp -> {
Pier3e793752018-04-19 16:47:06 +02001728 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1729 // Verify leadership on the operation
1730 if (!mcastUtils.isLeader(mcastIp)) {
1731 log.trace("Skip {} due to lack of leadership", mcastIp);
1732 return;
1733 }
1734 // Get sources and sinks from Mcast Route Service and warn about errors
1735 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pierb0328e42018-03-27 11:29:42 -07001736 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
Pier3e793752018-04-19 16:47:06 +02001737 .flatMap(Collection::stream).collect(Collectors.toSet());
1738 // Do not proceed if sources of this group are missing
1739 if (sources.isEmpty()) {
Pier Luigib87b8ab2018-03-02 12:53:37 +01001740 if (!sinks.isEmpty()) {
1741 log.warn("Unable to run buckets corrector. " +
Pier3e793752018-04-19 16:47:06 +02001742 "Missing source {} for group {}", sources, mcastIp);
Pier Luigib87b8ab2018-03-02 12:53:37 +01001743 }
Pier Luigib72201b2018-01-25 16:16:02 +01001744 return;
1745 }
Pier3e793752018-04-19 16:47:06 +02001746 sources.forEach(source -> {
1747 // For each group we get current information in the store
1748 // and issue a check of the next objectives in place
1749 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1750 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1751 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1752 // Do not proceed if ingress devices are missing
1753 if (ingressDevices.isEmpty()) {
1754 if (!sinks.isEmpty()) {
1755 log.warn("Unable to run buckets corrector. " +
1756 "Missing ingress {} for source {} and for group {}",
1757 ingressDevices, source, mcastIp);
1758 }
1759 return;
Pier Luigib72201b2018-01-25 16:16:02 +01001760 }
Pier3e793752018-04-19 16:47:06 +02001761 // Create the set of the devices to be processed
1762 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1763 if (!ingressDevices.isEmpty()) {
1764 devicesBuilder.addAll(ingressDevices);
1765 }
1766 if (!transitDevices.isEmpty()) {
1767 devicesBuilder.addAll(transitDevices);
1768 }
1769 if (!egressDevices.isEmpty()) {
1770 devicesBuilder.addAll(egressDevices);
1771 }
1772 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1773 devicesToProcess.forEach(deviceId -> {
1774 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1775 source : null);
1776 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1777 if (mcastNextObjStore.containsKey(currentKey)) {
1778 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1779 // Rebuild the next objective using assigned vlan
1780 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1781 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
1782 // Send to the flowobjective service
1783 srManager.flowObjectiveService.next(deviceId, currentNext);
1784 } else {
1785 log.warn("Unable to run buckets corrector. " +
1786 "Missing next for {}, for source {} and for group {}",
1787 deviceId, source, mcastIp);
1788 }
1789 });
Pier Luigib72201b2018-01-25 16:16:02 +01001790 });
Pier Luigib72201b2018-01-25 16:16:02 +01001791 });
1792 } finally {
Pier Luigib72201b2018-01-25 16:16:02 +01001793 mcastUnlock();
1794 }
1795
1796 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001797 }
Pier Luigib29144d2018-01-15 18:06:43 +01001798
Pier3e793752018-04-19 16:47:06 +02001799 /**
1800 * Returns the associated next ids to the mcast groups or to the single
1801 * group if mcastIp is present.
1802 *
1803 * @param mcastIp the group ip
1804 * @return the mapping mcastIp-device to next id
1805 */
Pier Luigib29144d2018-01-15 18:06:43 +01001806 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
Pier Luigib29144d2018-01-15 18:06:43 +01001807 if (mcastIp != null) {
1808 return mcastNextObjStore.entrySet().stream()
1809 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier3e793752018-04-19 16:47:06 +02001810 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigib29144d2018-01-15 18:06:43 +01001811 }
Pier Luigib29144d2018-01-15 18:06:43 +01001812 return mcastNextObjStore.entrySet().stream()
Pier3e793752018-04-19 16:47:06 +02001813 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigib29144d2018-01-15 18:06:43 +01001814 }
1815
Pierb1fe7382018-04-17 17:25:22 +02001816 /**
1817 * Returns the associated roles to the mcast groups or to the single
1818 * group if mcastIp is present.
1819 *
1820 * @param mcastIp the group ip
1821 * @return the mapping mcastIp-device to mcast role
1822 *
1823 * @deprecated in 1.12 ("Magpie") release.
1824 */
1825 @Deprecated
Pier Luigi96fe0772018-02-28 12:10:50 +01001826 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigib29144d2018-01-15 18:06:43 +01001827 if (mcastIp != null) {
1828 return mcastRoleStore.entrySet().stream()
1829 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier3e793752018-04-19 16:47:06 +02001830 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1831 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigib29144d2018-01-15 18:06:43 +01001832 }
Pier Luigib29144d2018-01-15 18:06:43 +01001833 return mcastRoleStore.entrySet().stream()
Pier3e793752018-04-19 16:47:06 +02001834 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1835 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigib29144d2018-01-15 18:06:43 +01001836 }
1837
Pierb1fe7382018-04-17 17:25:22 +02001838 /**
Pier3e793752018-04-19 16:47:06 +02001839 * Returns the associated roles to the mcast groups.
1840 *
1841 * @param mcastIp the group ip
1842 * @param sourcecp the source connect point
1843 * @return the mapping mcastIp-device to mcast role
1844 */
1845 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1846 ConnectPoint sourcecp) {
1847 if (mcastIp != null) {
1848 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1849 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1850 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1851 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1852 if (sourcecp != null) {
1853 roles = roles.entrySet().stream()
1854 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1855 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1856 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1857 }
1858 return roles;
1859 }
1860 return mcastRoleStore.entrySet().stream()
1861 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1862 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1863 }
1864
1865
1866 /**
Pierb1fe7382018-04-17 17:25:22 +02001867 * Returns the associated paths to the mcast group.
1868 *
1869 * @param mcastIp the group ip
1870 * @return the mapping egress point to mcast path
1871 *
1872 * @deprecated in 1.12 ("Magpie") release.
1873 */
1874 @Deprecated
Pier Luigib29144d2018-01-15 18:06:43 +01001875 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1876 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
Pierb0328e42018-03-27 11:29:42 -07001877 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigib29144d2018-01-15 18:06:43 +01001878 if (source != null) {
Pier Luigib29144d2018-01-15 18:06:43 +01001879 Set<DeviceId> visited = Sets.newHashSet();
Pier3e793752018-04-19 16:47:06 +02001880 List<ConnectPoint> currentPath = Lists.newArrayList(source);
1881 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pier Luigib29144d2018-01-15 18:06:43 +01001882 }
1883 return mcastPaths;
1884 }
1885
Pierb1fe7382018-04-17 17:25:22 +02001886 /**
1887 * Returns the associated trees to the mcast group.
1888 *
1889 * @param mcastIp the group ip
1890 * @param sourcecp the source connect point
1891 * @return the mapping egress point to mcast path
1892 */
1893 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1894 ConnectPoint sourcecp) {
1895 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pierb1fe7382018-04-17 17:25:22 +02001896 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pierb1fe7382018-04-17 17:25:22 +02001897 if (sourcecp != null) {
1898 sources = sources.stream()
Pier3e793752018-04-19 16:47:06 +02001899 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pierb1fe7382018-04-17 17:25:22 +02001900 }
Pierb1fe7382018-04-17 17:25:22 +02001901 if (!sources.isEmpty()) {
1902 sources.forEach(source -> {
Pierb1fe7382018-04-17 17:25:22 +02001903 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1904 Set<DeviceId> visited = Sets.newHashSet();
1905 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Pier3e793752018-04-19 16:47:06 +02001906 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pierb1fe7382018-04-17 17:25:22 +02001907 mcastPaths.forEach(mcastTrees::put);
1908 });
1909 }
1910 return mcastTrees;
1911 }
1912
1913 /**
1914 * Build recursively the mcast paths.
1915 *
1916 * @param toVisit the node to visit
1917 * @param visited the visited nodes
1918 * @param mcastPaths the current mcast paths
1919 * @param currentPath the current path
1920 * @param mcastIp the group ip
Pier3e793752018-04-19 16:47:06 +02001921 * @param source the source
Pierb1fe7382018-04-17 17:25:22 +02001922 */
Pier Luigib29144d2018-01-15 18:06:43 +01001923 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1924 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
Pier3e793752018-04-19 16:47:06 +02001925 List<ConnectPoint> currentPath, IpAddress mcastIp,
1926 ConnectPoint source) {
1927 // If we have visited the node to visit there is a loop
Pier Luigib29144d2018-01-15 18:06:43 +01001928 if (visited.contains(toVisit)) {
1929 return;
1930 }
1931 // Visit next-hop
1932 visited.add(toVisit);
Pier3e793752018-04-19 16:47:06 +02001933 VlanId assignedVlan = mcastUtils.assignedVlan(toVisit.equals(source.deviceId()) ? source : null);
1934 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit, assignedVlan);
Pier Luigib29144d2018-01-15 18:06:43 +01001935 // Looking for next-hops
1936 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
Pier3e793752018-04-19 16:47:06 +02001937 // Build egress connect points, get ports and build relative cps
Pier Luigib29144d2018-01-15 18:06:43 +01001938 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Pierb0328e42018-03-27 11:29:42 -07001939 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigib29144d2018-01-15 18:06:43 +01001940 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1941 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1942 Set<ConnectPoint> egressPoints = cpBuilder.build();
Pier Luigib29144d2018-01-15 18:06:43 +01001943 Set<Link> egressLinks;
1944 List<ConnectPoint> newCurrentPath;
1945 Set<DeviceId> newVisited;
1946 DeviceId newToVisit;
1947 for (ConnectPoint egressPoint : egressPoints) {
1948 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1949 // If it does not have egress links, stop
1950 if (egressLinks.isEmpty()) {
1951 // Add the connect points to the path
1952 newCurrentPath = Lists.newArrayList(currentPath);
1953 newCurrentPath.add(0, egressPoint);
Pier Luigib29144d2018-01-15 18:06:43 +01001954 mcastPaths.put(egressPoint, newCurrentPath);
1955 } else {
1956 newVisited = Sets.newHashSet(visited);
1957 // Iterate over the egress links for the next hops
1958 for (Link egressLink : egressLinks) {
Pier Luigib29144d2018-01-15 18:06:43 +01001959 newToVisit = egressLink.dst().deviceId();
Pier Luigib29144d2018-01-15 18:06:43 +01001960 newCurrentPath = Lists.newArrayList(currentPath);
1961 newCurrentPath.add(0, egressPoint);
1962 newCurrentPath.add(0, egressLink.dst());
Pier3e793752018-04-19 16:47:06 +02001963 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp, source);
Pier Luigib29144d2018-01-15 18:06:43 +01001964 }
1965 }
1966 }
1967 }
1968 }
1969
Pier96f63cb2018-04-17 16:29:56 +02001970 /**
1971 * Return the leaders of the mcast groups.
1972 *
1973 * @param mcastIp the group ip
1974 * @return the mapping group-node
1975 */
1976 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1977 return mcastUtils.getMcastLeaders(mcastIp);
1978 }
Charles Chand55e84d2016-03-30 17:54:24 -07001979}