blob: 44a600cb8b1336ea22bea8fb7182299c6c9da26d [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigid29ca7c2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Charles Chanba59dd62018-05-10 22:19:49 +000041import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070042import org.onosproject.net.ConnectPoint;
43import org.onosproject.net.DeviceId;
44import org.onosproject.net.Link;
45import org.onosproject.net.Path;
46import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070051import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010052import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070054import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010055import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020059import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigi35dab3f2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
74import java.util.concurrent.locks.Lock;
75import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070076import java.util.stream.Collectors;
77
Pier Luigi35dab3f2018-01-25 16:16:02 +010078import static java.util.concurrent.Executors.newScheduledThreadPool;
79import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000080
Pierdb27b8d2018-04-17 16:29:56 +020081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020083import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
84import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000085import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
86import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
87
Pier979e61a2018-03-07 11:42:50 +010088import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
89import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070091
92/**
Pier Luigi69f774d2018-02-28 12:10:50 +010093 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070094 */
Charles Chan1eaf4802016-04-18 13:44:03 -070095public class McastHandler {
96 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070097 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070098 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +020099 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700100 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +0200101 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Charles Chan72779502016-04-23 17:36:10 -0700102
Pier Luigid29ca7c2018-02-28 17:24:03 +0100103 // Wait time for the cache
104 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700105
Piere99511d2018-04-19 16:47:06 +0200106 //The mcastEventCache is implemented to avoid race condition by giving more time
107 // to the underlying subsystems to process previous calls.
Pier Luigid29ca7c2018-02-28 17:24:03 +0100108 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
109 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
110 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100111 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700112 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100113 McastEvent mcastEvent = notification.getValue();
114 RemovalCause cause = notification.getCause();
115 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
116 mcastIp, sink, mcastEvent, cause);
Piere99511d2018-04-19 16:47:06 +0200117 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigid29ca7c2018-02-28 17:24:03 +0100118 switch (notification.getCause()) {
119 case REPLACED:
120 case EXPIRED:
121 dequeueMcastEvent(mcastEvent);
122 break;
123 default:
124 break;
125 }
126 }).build();
127
128 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier1f87aca2018-03-14 16:47:32 -0700129 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700130 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
131 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700132 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700133 if (mcastEvent.type() == SOURCES_ADDED ||
134 mcastEvent.type() == SOURCES_REMOVED) {
Piere99511d2018-04-19 16:47:06 +0200135 // Current subject and prev just differ for the source connect points
136 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pier7b657162018-03-27 11:29:42 -0700137 } else if (mcastEvent.type() == SINKS_ADDED) {
Pier7b657162018-03-27 11:29:42 -0700138 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
139 // Get the previous locations and verify if there are changes
140 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
141 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
142 prevConnectPoints : Collections.emptySet());
143 if (!changes.isEmpty()) {
144 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700145 }
Pier7b657162018-03-27 11:29:42 -0700146 }));
147 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pier7b657162018-03-27 11:29:42 -0700148 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
149 // Get the current locations and verify if there are changes
150 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
151 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
152 currentConnectPoints : Collections.emptySet());
153 if (!changes.isEmpty()) {
154 sinksBuilder.add(hostId);
155 }
156 }));
157 } else if (mcastEvent.type() == ROUTE_REMOVED) {
158 // Current subject is null, just take the previous host ids
159 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100160 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100161 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700162 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100163 mcastEventCache.put(cacheKey, mcastEvent);
164 });
165 }
166
167 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700168 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
169 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier7b657162018-03-27 11:29:42 -0700170 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700171 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Piere99511d2018-04-19 16:47:06 +0200172 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
173 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
174 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
175 Set<ConnectPoint> sources;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100176 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700177 case SOURCES_ADDED:
Piere99511d2018-04-19 16:47:06 +0200178 sources = mcastUpdate.sources()
179 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
180 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
181 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100182 break;
Pier1f87aca2018-03-14 16:47:32 -0700183 case SOURCES_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200184 sources = mcastUpdate.sources()
185 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
186 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
187 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100188 break;
189 case ROUTE_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200190 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100191 break;
Pier1f87aca2018-03-14 16:47:32 -0700192 case SINKS_ADDED:
Piere99511d2018-04-19 16:47:06 +0200193 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100194 break;
Pier1f87aca2018-03-14 16:47:32 -0700195 case SINKS_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200196 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100197 break;
198 default:
199 break;
200 }
201 }
202
Pier Luigi35dab3f2018-01-25 16:16:02 +0100203 // Mcast lock to serialize local operations
204 private final Lock mcastLock = new ReentrantLock();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100205 private void mcastLock() {
206 mcastLock.lock();
207 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100208 private void mcastUnlock() {
209 mcastLock.unlock();
210 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100211 // Stability threshold for Mcast. Seconds
212 private static final long MCAST_STABLITY_THRESHOLD = 5;
213 // Last change done
214 private Instant lastMcastChange = Instant.now();
215
216 /**
217 * Determines if mcast in the network has been stable in the last
218 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
219 * to the last mcast change timestamp.
220 *
221 * @return true if stable
222 */
223 private boolean isMcastStable() {
224 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
225 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800226 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100227 return (now - last) > MCAST_STABLITY_THRESHOLD;
228 }
229
Piere99511d2018-04-19 16:47:06 +0200230 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100231 private static final long MCAST_VERIFY_INTERVAL = 30;
Piere99511d2018-04-19 16:47:06 +0200232 // Executor for mcast bucket corrector and for cache
Pier Luigi35dab3f2018-01-25 16:16:02 +0100233 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100234 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100235
Charles Chan72779502016-04-23 17:36:10 -0700236 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700237 * Constructs the McastEventHandler.
238 *
239 * @param srManager Segment Routing manager
240 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700241 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700242 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700243 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700244 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700245 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700246 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200247 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700248 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700249 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700250 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700251 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700252 .build();
Piere99511d2018-04-19 16:47:06 +0200253 mcastKryo = new KryoNamespace.Builder()
254 .register(KryoNamespaces.API)
255 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
256 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700257 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200258 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700259 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700260 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700261 .build();
Pier7b657162018-03-27 11:29:42 -0700262 mcastUtils = new McastUtils(srManager, coreAppId, log);
Piere99511d2018-04-19 16:47:06 +0200263 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigi35dab3f2018-01-25 16:16:02 +0100264 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700265 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100266 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
267 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700268 }
269
270 /**
Piere99511d2018-04-19 16:47:06 +0200271 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700272 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100273 public void init() {
Pier7b657162018-03-27 11:29:42 -0700274 lastMcastChange = Instant.now();
275 mcastLock();
276 try {
277 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Piere99511d2018-04-19 16:47:06 +0200278 log.debug("Init group {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +0200279 if (!mcastUtils.isLeader(mcastRoute.group())) {
280 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
281 return;
282 }
Pier7b657162018-03-27 11:29:42 -0700283 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Piere99511d2018-04-19 16:47:06 +0200284 // For each source process the mcast tree
285 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
286 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
287 Set<DeviceId> visited = Sets.newHashSet();
288 List<ConnectPoint> currentPath = Lists.newArrayList(source);
289 buildMcastPaths(source.deviceId(), visited, mcastPaths,
290 currentPath, mcastRoute.group(), source);
291 // Get all the sinks and process them
292 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
293 mcastRouteData.sinks());
294 // Filter out all the working sinks, we do not want to move them
295 // TODO we need a better way to distinguish flows coming from different sources
296 sinks = sinks.stream()
297 .filter(sink -> !mcastPaths.containsKey(sink) ||
298 !isSinkForSource(mcastRoute.group(), sink, source))
299 .collect(Collectors.toSet());
300 if (sinks.isEmpty()) {
301 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
302 return;
303 }
304 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
305 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
306 mcastRoute.group(), paths));
307 });
Pier7b657162018-03-27 11:29:42 -0700308 });
309 } finally {
310 mcastUnlock();
311 }
Charles Chanc91c8782016-03-30 17:54:24 -0700312 }
313
314 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100315 * Clean up when deactivating the application.
316 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100317 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200318 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100319 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200320 mcastNextObjStore.destroy();
321 mcastRoleStore.destroy();
322 mcastUtils.terminate();
323 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100324 }
325
326 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100327 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200328 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700329 *
330 * @param event McastEvent with SOURCE_ADDED type
331 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100332 public void processMcastEvent(McastEvent event) {
Charles Chanba59dd62018-05-10 22:19:49 +0000333 log.info("process {}", event);
Pierdb27b8d2018-04-17 16:29:56 +0200334 // If it is a route added, we do not enqueue
335 if (event.type() == ROUTE_ADDED) {
Pierdb27b8d2018-04-17 16:29:56 +0200336 processRouteAddedInternal(event.subject().route().group());
337 } else {
Pierdb27b8d2018-04-17 16:29:56 +0200338 enqueueMcastEvent(event);
339 }
Pier Luigi6786b922018-02-02 16:19:11 +0100340 }
341
342 /**
Piere99511d2018-04-19 16:47:06 +0200343 * Process the SOURCES_ADDED event.
344 *
345 * @param sources the sources connect point
346 * @param mcastIp the group address
347 * @param sinks the sinks connect points
348 */
349 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
350 Map<HostId, Set<ConnectPoint>> sinks) {
351 lastMcastChange = Instant.now();
352 mcastLock();
353 try {
354 log.debug("Processing sources added {} for group {}", sources, mcastIp);
355 if (!mcastUtils.isLeader(mcastIp)) {
356 log.debug("Skip {} due to lack of leadership", mcastIp);
357 return;
358 }
359 sources.forEach(source -> {
360 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
361 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinksToBeAdded);
362 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
363 });
364 } finally {
365 mcastUnlock();
366 }
367 }
368
369 /**
370 * Process the SOURCES_REMOVED event.
371 *
372 * @param sourcesToBeRemoved the source connect points to be removed
373 * @param remainingSources the remainig source connect points
374 * @param mcastIp the group address
375 * @param sinks the sinks connect points
376 */
377 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
378 Set<ConnectPoint> remainingSources,
379 IpAddress mcastIp,
380 Map<HostId, Set<ConnectPoint>> sinks) {
381 lastMcastChange = Instant.now();
382 mcastLock();
383 try {
384 log.debug("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
385 if (!mcastUtils.isLeader(mcastIp)) {
386 log.debug("Skip {} due to lack of leadership", mcastIp);
387 return;
388 }
389 if (remainingSources.isEmpty()) {
390 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
391 return;
392 }
393 // Skip offline devices
394 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
395 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
396 .collect(Collectors.toSet());
397 if (candidateSources.isEmpty()) {
398 log.debug("Skip {} due to empty sources to be removed", mcastIp);
399 return;
400 }
401 Set<Link> remainingLinks = Sets.newHashSet();
402 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
403 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
404 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
405 totalSources.addAll(remainingSources);
406 // Calculate all the links used by the sources
407 totalSources.forEach(source -> {
408 Set<ConnectPoint> currentSinks = sinks.values()
409 .stream().flatMap(Collection::stream)
410 .filter(sink -> isSinkForSource(mcastIp, sink, source))
411 .collect(Collectors.toSet());
412 candidateSinks.put(source, currentSinks);
413 currentSinks.forEach(currentSink -> {
414 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
415 mcastIp, null, source);
416 if (currentPath.isPresent()) {
417 if (!candidateSources.contains(source)) {
418 remainingLinks.addAll(currentPath.get().links());
419 } else {
420 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
421 }
422 }
423 });
424 });
425 // Clean transit links
426 candidateLinks.forEach((source, currentCandidateLinks) -> {
427 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
428 .immutableCopy();
429 if (!linksToBeRemoved.isEmpty()) {
430 currentCandidateLinks.forEach(link -> {
431 DeviceId srcLink = link.src().deviceId();
432 // Remove ports only on links to be removed
433 if (linksToBeRemoved.contains(link)) {
434 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
435 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
436 source : null));
437 }
438 // Remove role on the candidate links
439 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
440 });
441 }
442 });
443 // Clean ingress and egress
444 candidateSources.forEach(source -> {
445 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
446 currentSinks.forEach(currentSink -> {
447 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
448 source : null);
449 // Sinks co-located with the source
450 if (source.deviceId().equals(currentSink.deviceId())) {
451 if (source.port().equals(currentSink.port())) {
452 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
453 mcastIp, currentSink, source);
454 return;
455 }
456 // We need to check against the other sources and if it is
457 // necessary remove the port from the device - no overlap
458 Set<VlanId> otherVlans = remainingSources.stream()
459 // Only sources co-located and having this sink
460 .filter(remainingSource -> remainingSource.deviceId()
461 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
462 .contains(currentSink))
463 .map(remainingSource -> mcastUtils.assignedVlan(
464 remainingSource.deviceId().equals(currentSink.deviceId()) ?
465 remainingSource : null)).collect(Collectors.toSet());
466 if (!otherVlans.contains(assignedVlan)) {
467 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
468 mcastIp, assignedVlan);
469 }
470 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
471 source));
472 return;
473 }
474 Set<VlanId> otherVlans = remainingSources.stream()
475 .filter(remainingSource -> candidateSinks.get(remainingSource)
476 .contains(currentSink))
477 .map(remainingSource -> mcastUtils.assignedVlan(
478 remainingSource.deviceId().equals(currentSink.deviceId()) ?
479 remainingSource : null)).collect(Collectors.toSet());
480 // Sinks on other leaves
481 if (!otherVlans.contains(assignedVlan)) {
482 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
483 mcastIp, assignedVlan);
484 }
485 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
486 source));
487 });
488 });
489 } finally {
490 mcastUnlock();
491 }
492 }
493
494 /**
Pierdb27b8d2018-04-17 16:29:56 +0200495 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100496 *
Pierdb27b8d2018-04-17 16:29:56 +0200497 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100498 */
Pierdb27b8d2018-04-17 16:29:56 +0200499 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100500 lastMcastChange = Instant.now();
501 mcastLock();
502 try {
Pierdb27b8d2018-04-17 16:29:56 +0200503 log.debug("Processing route added for group {}", mcastIp);
504 // Just elect a new leader
505 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100506 } finally {
507 mcastUnlock();
508 }
509 }
510
511 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100512 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200513 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100514 * @param mcastIp multicast group IP address
515 */
Piere99511d2018-04-19 16:47:06 +0200516 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi6786b922018-02-02 16:19:11 +0100517 lastMcastChange = Instant.now();
518 mcastLock();
519 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100520 log.debug("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200521 if (!mcastUtils.isLeader(mcastIp)) {
522 log.debug("Skip {} due to lack of leadership", mcastIp);
523 mcastUtils.withdrawLeader(mcastIp);
524 return;
525 }
Piere99511d2018-04-19 16:47:06 +0200526 sources.forEach(source -> {
527 // Find out the ingress, transit and egress device of the affected group
528 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
529 .stream().findFirst().orElse(null);
530 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
531 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
532 // If there are no egress and transit devices, sinks could be only on the ingress
533 if (!egressDevices.isEmpty()) {
534 egressDevices.forEach(deviceId -> {
535 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
536 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
537 });
538 }
539 if (!transitDevices.isEmpty()) {
540 transitDevices.forEach(deviceId -> {
541 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
542 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
543 });
544 }
545 if (ingressDevice != null) {
546 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
547 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
548 }
549 });
550 // Finally, withdraw the leadership
551 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100552 } finally {
553 mcastUnlock();
554 }
555 }
556
Pier7b657162018-03-27 11:29:42 -0700557 /**
558 * Process sinks to be removed.
559 *
Piere99511d2018-04-19 16:47:06 +0200560 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700561 * @param mcastIp the ip address of the group
562 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200563 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700564 */
Piere99511d2018-04-19 16:47:06 +0200565 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700566 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200567 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700568 lastMcastChange = Instant.now();
569 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700570 try {
Pierdb27b8d2018-04-17 16:29:56 +0200571 if (!mcastUtils.isLeader(mcastIp)) {
572 log.debug("Skip {} due to lack of leadership", mcastIp);
573 return;
574 }
Piere99511d2018-04-19 16:47:06 +0200575 log.debug("Processing sinks removed for group {} and for sources {}",
576 mcastIp, sources);
577 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
578 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
579 sources.forEach(source -> {
580 // Save the path associated to the sinks to be removed
581 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
582 newSinks, source);
583 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
584 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
585 sink.deviceId(), mcastIp,
586 null, source)));
587 treesToBeRemoved.put(source, treeToBeRemoved);
588 // Recover the dual-homed sinks
589 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
590 prevSinks, source);
591 treesToBeAdded.put(source, sinksToBeRecovered);
592 });
593 // Remove the sinks taking into account the multiple sources and the original paths
594 treesToBeRemoved.forEach((source, tree) ->
595 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
596 // Add new sinks according to the recovery procedure
597 treesToBeAdded.forEach((source, sinks) ->
598 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700599 } finally {
600 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700601 }
602 }
603
Pier Luigi6786b922018-02-02 16:19:11 +0100604 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100605 * Removes a path from source to sink for given multicast group.
606 *
607 * @param source connect point of the multicast source
608 * @param sink connection point of the multicast sink
609 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200610 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100611 */
612 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200613 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100614 lastMcastChange = Instant.now();
615 mcastLock();
616 try {
Piere99511d2018-04-19 16:47:06 +0200617 log.debug("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700618 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100619 // When source and sink are on the same device
620 if (source.deviceId().equals(sink.deviceId())) {
621 // Source and sink are on even the same port. There must be something wrong.
622 if (source.port().equals(sink.port())) {
Piere99511d2018-04-19 16:47:06 +0200623 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100624 return;
625 }
Pier7b657162018-03-27 11:29:42 -0700626 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100627 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200628 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100629 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100630 return;
631 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100632 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700633 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100634 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200635 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100636 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100637 // If this is the last sink on the device, also update upstream
Pier Luigi35dab3f2018-01-25 16:16:02 +0100638 if (mcastPath.isPresent()) {
639 List<Link> links = Lists.newArrayList(mcastPath.get().links());
640 Collections.reverse(links);
641 for (Link link : links) {
642 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200643 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
644 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi92e69be2018-03-02 12:53:37 +0100645 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200646 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100647 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100648 }
Charles Chanc91c8782016-03-30 17:54:24 -0700649 }
650 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100651 } finally {
652 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700653 }
654 }
655
Pier7b657162018-03-27 11:29:42 -0700656
657 /**
658 * Process sinks to be added.
659 *
Piere99511d2018-04-19 16:47:06 +0200660 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700661 * @param mcastIp the group IP
662 * @param newSinks the new sinks to be processed
663 * @param allPrevSinks all previous sinks
664 */
Piere99511d2018-04-19 16:47:06 +0200665 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700666 Map<HostId, Set<ConnectPoint>> newSinks,
667 Set<ConnectPoint> allPrevSinks) {
668 lastMcastChange = Instant.now();
669 mcastLock();
670 try {
Pierdb27b8d2018-04-17 16:29:56 +0200671 if (!mcastUtils.isLeader(mcastIp)) {
672 log.debug("Skip {} due to lack of leadership", mcastIp);
673 return;
674 }
Piere99511d2018-04-19 16:47:06 +0200675 log.debug("Processing sinks added for group {} and for sources {}", mcastIp, sources);
676 sources.forEach(source -> {
677 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
678 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
679 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
680 });
Pier7b657162018-03-27 11:29:42 -0700681 } finally {
682 mcastUnlock();
683 }
684 }
685
Charles Chanc91c8782016-03-30 17:54:24 -0700686 /**
687 * Establishes a path from source to sink for given multicast group.
688 *
689 * @param source connect point of the multicast source
690 * @param sink connection point of the multicast sink
691 * @param mcastIp multicast group IP address
692 */
693 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700694 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100695 lastMcastChange = Instant.now();
696 mcastLock();
697 try {
Piere99511d2018-04-19 16:47:06 +0200698 log.debug("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100699 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700700 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
Piere99511d2018-04-19 16:47:06 +0200701 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100702 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100703 if (source.port().equals(sink.port())) {
704 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
705 mcastIp, sink, source);
706 return;
707 }
Pier7b657162018-03-27 11:29:42 -0700708 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere99511d2018-04-19 16:47:06 +0200709 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100710 return;
711 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100712 // Find a path. If present, create/update groups and flows for each hop
Piere99511d2018-04-19 16:47:06 +0200713 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100714 if (mcastPath.isPresent()) {
715 List<Link> links = mcastPath.get().links();
Pier1a7e0c02018-03-12 15:00:54 -0700716 // Setup mcast role for ingress
Piere99511d2018-04-19 16:47:06 +0200717 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
718 // Setup properly the transit forwarding
Pier Luigi35dab3f2018-01-25 16:16:02 +0100719 links.forEach(link -> {
720 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700721 mcastUtils.assignedVlan(link.src().deviceId()
722 .equals(source.deviceId()) ? source : null));
723 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
724 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100725 });
Pier1a7e0c02018-03-12 15:00:54 -0700726 // Setup mcast role for the transit
727 links.stream()
728 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Piere99511d2018-04-19 16:47:06 +0200729 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
730 source), TRANSIT));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100731 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700732 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700733 // Setup mcast role for egress
Piere99511d2018-04-19 16:47:06 +0200734 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100735 } else {
Piere99511d2018-04-19 16:47:06 +0200736 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100737 }
738 } finally {
739 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700740 }
741 }
742
743 /**
Charles Chan72779502016-04-23 17:36:10 -0700744 * Processes the LINK_DOWN event.
745 *
746 * @param affectedLink Link that is going down
747 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100748 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100749 lastMcastChange = Instant.now();
750 mcastLock();
751 try {
752 // Get groups affected by the link down event
753 getAffectedGroups(affectedLink).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200754 log.debug("Processing link down {} for group {}", affectedLink, mcastIp);
755 recoverFailure(mcastIp, affectedLink);
Charles Chan72779502016-04-23 17:36:10 -0700756 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100757 } finally {
758 mcastUnlock();
759 }
Charles Chan72779502016-04-23 17:36:10 -0700760 }
761
762 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100763 * Process the DEVICE_DOWN event.
764 *
765 * @param deviceDown device going down
766 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100767 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100768 lastMcastChange = Instant.now();
769 mcastLock();
770 try {
771 // Get the mcast groups affected by the device going down
772 getAffectedGroups(deviceDown).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200773 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
774 recoverFailure(mcastIp, deviceDown);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100775 });
776 } finally {
777 mcastUnlock();
778 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100779 }
780
781 /**
Piere99511d2018-04-19 16:47:06 +0200782 * General failure recovery procedure.
783 *
784 * @param mcastIp the group to recover
785 * @param failedElement the failed element
786 */
787 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
788 // TODO Optimize when the group editing is in place
789 if (!mcastUtils.isLeader(mcastIp)) {
790 log.debug("Skip {} due to lack of leadership", mcastIp);
791 return;
792 }
793 // Do not proceed if the sources of this group are missing
794 Set<ConnectPoint> sources = getSources(mcastIp);
795 if (sources.isEmpty()) {
796 log.warn("Missing sources for group {}", mcastIp);
797 return;
798 }
799 // Find out the ingress devices of the affected group
800 // If sinks are in other leafs, we have ingress, transit, egress, and source
801 // If sinks are in the same leaf, we have just ingress and source
802 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
803 if (ingressDevices.isEmpty()) {
804 log.warn("Missing ingress devices for group {}", ingressDevices, mcastIp);
805 return;
806 }
807 // For each tree, delete ingress-transit part
808 sources.forEach(source -> {
809 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
810 transitDevices.forEach(transitDevice -> {
811 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
812 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
813 });
814 });
815 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
816 // TODO Evaluate the possibility of building optimize trees between sources
817 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
818 sources.forEach(source -> {
819 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
820 DeviceId ingressDevice = ingressDevices.stream()
821 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
822 // Clean also the ingress
823 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
824 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
825 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
826 }
827 if (ingressDevice == null) {
828 log.warn("Skip failure recovery - " +
829 "Missing ingress for source {} and group {}", source, mcastIp);
830 return;
831 }
832 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
833 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
834 // We have to verify, if there are egresses without paths
835 mcastTree.forEach((egressDevice, paths) -> {
836 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
837 mcastIp, paths, source);
838 // No paths, we have to try with alternative location
839 if (!mcastPath.isPresent()) {
840 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
841 listSources = listSources == null ? Sets.newHashSet() : listSources;
842 listSources.add(source);
843 return listSources;
844 });
845 notRecoveredInternal.add(egressDevice);
846 }
847 });
848 // Fast path, we can recover all the locations
849 if (notRecoveredInternal.isEmpty()) {
850 mcastTree.forEach((egressDevice, paths) -> {
Charles Chanba59dd62018-05-10 22:19:49 +0000851 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
852 mcastIp, paths, source);
853 if (mcastPath.isPresent()) {
854 installPath(mcastIp, source, mcastPath.get());
855 }
Piere99511d2018-04-19 16:47:06 +0200856 });
857 } else {
858 // Let's try to recover using alternative locations
859 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
860 ingressDevice, source);
861 }
862 });
863 // Finally remove the egresses not recovered
864 notRecovered.forEach((egressDevice, listSources) -> {
865 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
866 if (Objects.equal(currentSources, listSources)) {
867 log.warn("Fail to recover egress device {} from {} failure {}",
868 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
869 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
870 }
871 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
872 });
873 }
874
875 /**
Pier7b657162018-03-27 11:29:42 -0700876 * Try to recover sinks using alternate locations.
877 *
878 * @param egressDevices the original egress devices
879 * @param notRecovered the devices not recovered
880 * @param mcastIp the group address
881 * @param ingressDevice the ingress device
882 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700883 */
884 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200885 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
886 log.debug("Processing recover sinks for group {} and for source {}",
887 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700888 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700889 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700890 Set<ConnectPoint> totalSinks = Sets.newHashSet();
891 // Let's compute all the affected sinks and all the sinks
892 notRecovered.forEach(deviceId -> {
893 totalAffectedSinks.addAll(
Charles Chanba59dd62018-05-10 22:19:49 +0000894 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
895 .flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700896 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Charles Chanba59dd62018-05-10 22:19:49 +0000897 .collect(Collectors.toSet())
898 );
Pier7b657162018-03-27 11:29:42 -0700899 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200900 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Charles Chanba59dd62018-05-10 22:19:49 +0000901 .flatMap(Collection::stream).collect(Collectors.toSet())
902 );
Pier7b657162018-03-27 11:29:42 -0700903 });
Pier7b657162018-03-27 11:29:42 -0700904 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200905 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
906 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
907 newEgressDevices.addAll(recovered);
908 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
909 newEgressDevices = newEgressDevices.stream()
910 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
911 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700912 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200913 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700914 sinksToBeAdded.stream()
915 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
916 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
917 }
Pier7b657162018-03-27 11:29:42 -0700918 // Construct a new path for each egress device
919 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200920 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700921 if (mcastPath.isPresent()) {
922 // Using recovery procedure
923 if (recovered.contains(egressDevice)) {
924 installPath(mcastIp, source, mcastPath.get());
925 } else {
926 // otherwise we need to threat as new sink
927 sinksToBeAdded.stream()
928 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
929 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
930 }
Pier7b657162018-03-27 11:29:42 -0700931 }
932 });
Pier7b657162018-03-27 11:29:42 -0700933 }
934
935 /**
Pier28164682018-04-17 15:50:43 +0200936 * Process all the sinks related to a mcast group and return
937 * the ones to be removed.
938 *
939 * @param mcastIp the group address
940 * @param prevsinks the previous sinks to be evaluated
941 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200942 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200943 * @return the set of the sinks to be removed
944 */
945 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
946 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +0200947 Map<HostId, Set<ConnectPoint>> newSinks,
948 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +0200949 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
950 prevsinks.forEach(((hostId, connectPoints) -> {
951 // We have to check with the existing flows
952 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200953 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +0200954 .findFirst().orElse(null);
955 if (sinkToBeProcessed != null) {
956 // If the host has been removed or location has been removed
957 if (!newSinks.containsKey(hostId) ||
958 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
959 sinksToBeProcessed.add(sinkToBeProcessed);
960 }
961 }
962 }));
963 // We have done, return the set
964 return sinksToBeProcessed;
965 }
966
967 /**
Pier7b657162018-03-27 11:29:42 -0700968 * Process new locations and return the set of sinks to be added
969 * in the context of the recovery.
970 *
Pier28164682018-04-17 15:50:43 +0200971 * @param newSinks the remaining sinks
972 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +0200973 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700974 * @return the set of the sinks to be processed
975 */
Charles Chanba59dd62018-05-10 22:19:49 +0000976 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
977 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +0200978 Map<HostId, Set<ConnectPoint>> prevSinks,
979 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -0700980 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +0200981 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700982 // If it has more than 1 locations
983 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
984 log.debug("Skip {} since sink {} has {} locations",
985 mcastIp, hostId, connectPoints.size());
986 return;
987 }
Pier28164682018-04-17 15:50:43 +0200988 // If previously it had two locations, we need to recover it
989 // Filter out if the remaining location is already served
990 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +0200991 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200992 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +0200993 .findFirst().orElse(null);
994 if (sinkToBeProcessed != null) {
995 sinksToBeProcessed.add(sinkToBeProcessed);
996 }
Pier28164682018-04-17 15:50:43 +0200997 }
Pier7b657162018-03-27 11:29:42 -0700998 });
999 return sinksToBeProcessed;
1000 }
1001
1002 /**
1003 * Process all the sinks related to a mcast group and return
1004 * the ones to be processed.
1005 *
1006 * @param source the source connect point
1007 * @param mcastIp the group address
1008 * @param sinks the sinks to be evaluated
1009 * @return the set of the sinks to be processed
1010 */
1011 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1012 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001013 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1014 sinks.forEach(((hostId, connectPoints) -> {
1015 // If it has more than 2 locations
1016 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1017 log.debug("Skip {} since sink {} has {} locations",
1018 mcastIp, hostId, connectPoints.size());
1019 return;
1020 }
1021 // If it has one location, just use it
1022 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001023 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001024 return;
1025 }
1026 // We prefer to reuse existing flows
1027 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001028 .filter(connectPoint -> {
1029 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1030 return false;
1031 }
1032 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1033 return false;
1034 }
1035 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001036 .filter(remaining -> !remaining.equals(connectPoint))
1037 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001038 // We are already serving the sink
1039 return !isSinkForSource(mcastIp, other, source);
1040 }).findFirst().orElse(null);
1041
Pier7b657162018-03-27 11:29:42 -07001042 if (sinkToBeProcessed != null) {
1043 sinksToBeProcessed.add(sinkToBeProcessed);
1044 return;
1045 }
1046 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001047 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001048 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001049 .filter(connectPoint -> {
1050 if (!egresses.contains(connectPoint.deviceId())) {
1051 return false;
1052 }
1053 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1054 return false;
1055 }
1056 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001057 .filter(remaining -> !remaining.equals(connectPoint))
1058 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001059 return !isSinkForSource(mcastIp, other, source);
1060 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001061 if (sinkToBeProcessed != null) {
1062 sinksToBeProcessed.add(sinkToBeProcessed);
1063 return;
1064 }
1065 // Otherwise we prefer a location co-located with the source (if it exists)
1066 sinkToBeProcessed = connectPoints.stream()
1067 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1068 .findFirst().orElse(null);
1069 if (sinkToBeProcessed != null) {
1070 sinksToBeProcessed.add(sinkToBeProcessed);
1071 return;
1072 }
Piere99511d2018-04-19 16:47:06 +02001073 // Finally, we randomly pick a new location if it is reachable
1074 sinkToBeProcessed = connectPoints.stream()
1075 .filter(connectPoint -> {
1076 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1077 return false;
1078 }
1079 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001080 .filter(remaining -> !remaining.equals(connectPoint))
1081 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001082 return !isSinkForSource(mcastIp, other, source);
1083 }).findFirst().orElse(null);
1084 if (sinkToBeProcessed != null) {
1085 sinksToBeProcessed.add(sinkToBeProcessed);
1086 }
Pier7b657162018-03-27 11:29:42 -07001087 }));
Pier7b657162018-03-27 11:29:42 -07001088 return sinksToBeProcessed;
1089 }
1090
1091 /**
Pier1a7e0c02018-03-12 15:00:54 -07001092 * Utility method to remove all the ingress transit ports.
1093 *
1094 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001095 * @param ingressDevices the ingress devices
1096 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001097 */
Piere99511d2018-04-19 16:47:06 +02001098 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1099 Set<ConnectPoint> sources) {
1100 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1101 sources.forEach(source -> {
1102 DeviceId ingressDevice = ingressDevices.stream()
1103 .filter(deviceId -> deviceId.equals(source.deviceId()))
1104 .findFirst().orElse(null);
1105 if (ingressDevice == null) {
1106 log.warn("Skip removeIngressTransitPorts - " +
1107 "Missing ingress for source {} and group {}",
1108 source, mcastIp);
1109 return;
Pier1a7e0c02018-03-12 15:00:54 -07001110 }
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001111 Set<PortNumber> ingressTransitPort = ingressTransitPort(mcastIp, ingressDevice, source);
1112 if (ingressTransitPort.isEmpty()) {
1113 log.warn("No transit ports to remove on device {}", ingressDevice);
1114 return;
1115 }
1116 ingressTransitPorts.put(source, ingressTransitPort);
Pier1a7e0c02018-03-12 15:00:54 -07001117 });
Piere99511d2018-04-19 16:47:06 +02001118 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1119 DeviceId ingressDevice = ingressDevices.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001120 .filter(deviceId -> deviceId.equals(source.deviceId()))
1121 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001122 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1123 mcastIp, mcastUtils.assignedVlan(source));
1124 if (isLast) {
1125 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1126 }
1127 }));
Pier1a7e0c02018-03-12 15:00:54 -07001128 }
1129
1130 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001131 * Adds a port to given multicast group on given device. This involves the
1132 * update of L3 multicast group and multicast routing table entry.
1133 *
1134 * @param deviceId device ID
1135 * @param port port to be added
1136 * @param mcastIp multicast group
1137 * @param assignedVlan assigned VLAN ID
1138 */
Charles Chanba59dd62018-05-10 22:19:49 +00001139 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1140 IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001141 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001142 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001143 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001144 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001145 // First time someone request this mcast group via this device
1146 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001147 // New nextObj
Charles Chanba59dd62018-05-10 22:19:49 +00001148 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1149 portBuilder.build(), null).add();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001150 // Store the new port
1151 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001152 } else {
1153 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001154 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001155 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001156 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001157 if (existingPorts.contains(port)) {
1158 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1159 return;
1160 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001161 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001162 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001163 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001164 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001165 portBuilder.build(), nextObj.id()).addToExisting();
1166 // Store the final next objective and send only the difference to the driver
1167 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1168 // Add just the new port
1169 portBuilder = ImmutableSet.builder();
1170 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001171 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001172 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001173 }
1174 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001175 ObjectiveContext context = new DefaultObjectiveContext(
1176 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1177 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001178 (objective, error) -> log.warn("Failed to add {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001179 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001180 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1181 newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -07001182 srManager.flowObjectiveService.next(deviceId, newNextObj);
1183 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001184 }
1185
1186 /**
1187 * Removes a port from given multicast group on given device.
1188 * This involves the update of L3 multicast group and multicast routing
1189 * table entry.
1190 *
1191 * @param deviceId device ID
1192 * @param port port to be added
1193 * @param mcastIp multicast group
1194 * @param assignedVlan assigned VLAN ID
1195 * @return true if this is the last sink on this device
1196 */
Charles Chanba59dd62018-05-10 22:19:49 +00001197 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1198 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001199 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001200 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001201 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001202 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001203 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001204 }
Charles Chan72779502016-04-23 17:36:10 -07001205 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001206 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001207 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001208 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001209 if (!existingPorts.isEmpty()) {
1210 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1211 return false;
1212 }
1213 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001214 }
1215 // Copy and modify the ImmutableSet
1216 existingPorts = Sets.newHashSet(existingPorts);
1217 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001218 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001219 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001220 ForwardingObjective fwdObj;
1221 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001222 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001223 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1224 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001225 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001226 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001227 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001228 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001229 } else {
1230 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001231 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001232 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1233 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001234 (objective, error) -> log.warn("Failed to update {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001235 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +01001236 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001237 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001238 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001239 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001240 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001241 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001242 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001243 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001244 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1245 srManager.flowObjectiveService.next(deviceId, newNextObj);
1246 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001247 return existingPorts.isEmpty();
1248 }
1249
Charles Chan72779502016-04-23 17:36:10 -07001250 /**
1251 * Removes entire group on given device.
1252 *
1253 * @param deviceId device ID
1254 * @param mcastIp multicast group to be removed
1255 * @param assignedVlan assigned VLAN ID
1256 */
Charles Chanba59dd62018-05-10 22:19:49 +00001257 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1258 VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001259 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001260 // This device is not serving this multicast group
1261 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1262 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1263 return;
1264 }
1265 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001266 ObjectiveContext context = new DefaultObjectiveContext(
1267 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1268 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001269 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001270 mcastIp, deviceId, assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001271 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001272 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1273 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001274 }
1275
Pier Luigi580fd8a2018-01-16 10:47:50 +01001276 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001277 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001278 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001279 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001280 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001281 // For each link, modify the next on the source device adding the src port
1282 // and a new filter objective on the destination port
1283 links.forEach(link -> {
1284 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001285 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1286 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1287 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001288 });
Pier1a7e0c02018-03-12 15:00:54 -07001289 // Setup mcast role for the transit
1290 links.stream()
1291 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001292 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001293 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001294 }
1295
Charles Chanc91c8782016-03-30 17:54:24 -07001296 /**
Pier1f87aca2018-03-14 16:47:32 -07001297 * Go through all the paths, looking for shared links to be used
1298 * in the final path computation.
1299 *
1300 * @param egresses egress devices
1301 * @param availablePaths all the available paths towards the egress
1302 * @return shared links between egress devices
1303 */
Charles Chanba59dd62018-05-10 22:19:49 +00001304 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1305 Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001306 int minLength = Integer.MAX_VALUE;
1307 int length;
Pier1f87aca2018-03-14 16:47:32 -07001308 List<Path> currentPaths;
1309 // Verify the source can still reach all the egresses
1310 for (DeviceId egress : egresses) {
1311 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001312 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001313 currentPaths = availablePaths.get(egress);
1314 if (currentPaths.isEmpty()) {
1315 continue;
1316 }
Piere99511d2018-04-19 16:47:06 +02001317 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001318 length = currentPaths.get(0).links().size();
1319 if (length < minLength) {
1320 minLength = length;
1321 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001322 }
Pier1f87aca2018-03-14 16:47:32 -07001323 // If there are no paths
1324 if (minLength == Integer.MAX_VALUE) {
1325 return Collections.emptySet();
1326 }
Pier1f87aca2018-03-14 16:47:32 -07001327 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001328 Set<Link> sharedLinks = Sets.newHashSet();
1329 Set<Link> currentSharedLinks;
1330 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001331 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001332 // Let's find out the shared links
1333 while (index < minLength) {
1334 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001335 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001336 currentSharedLinks = Sets.newHashSet();
1337 // Iterate over the paths and take the "index" links
1338 for (Path path : currentPaths) {
1339 currentSharedLinks.add(path.links().get(index));
1340 }
1341 // Iterate over the remaining egress
1342 for (DeviceId egress : egresses) {
1343 // Iterate over the paths and take the "index" links
1344 currentLinks = Sets.newHashSet();
1345 for (Path path : availablePaths.get(egress)) {
1346 currentLinks.add(path.links().get(index));
1347 }
1348 // Do intersection
1349 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1350 // If there are no shared paths exit and record the device to remove
1351 // we have to retry with a subset of sinks
1352 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001353 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001354 index = minLength;
1355 break;
1356 }
1357 }
1358 sharedLinks.addAll(currentSharedLinks);
1359 index++;
1360 }
Piere99511d2018-04-19 16:47:06 +02001361 // If the shared links is empty and there are egress let's retry another time with less sinks,
1362 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001363 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1364 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001365 sharedLinks = exploreMcastTree(egresses, availablePaths);
1366 }
1367 return sharedLinks;
1368 }
1369
1370 /**
1371 * Build Mcast tree having as root the given source and as leaves the given egress points.
1372 *
1373 * @param source source of the tree
1374 * @param sinks leaves of the tree
1375 * @return the computed Mcast tree
1376 */
Charles Chanba59dd62018-05-10 22:19:49 +00001377 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
1378 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001379 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001380 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1381 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
Pier1f87aca2018-03-14 16:47:32 -07001382 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001383 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001384 // We need to put back the source if it was originally present
1385 sinks.forEach(sink -> {
1386 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1387 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1388 });
Pier1f87aca2018-03-14 16:47:32 -07001389 return finalTree;
1390 }
1391
1392 /**
1393 * Build Mcast tree having as root the given source and as leaves the given egress.
1394 *
1395 * @param source source of the tree
1396 * @param egresses leaves of the tree
1397 * @return the computed Mcast tree
1398 */
1399 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1400 Set<DeviceId> egresses) {
1401 // Pre-compute all the paths
1402 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001403 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1404 Collections.emptySet())));
1405 // Explore the topology looking for shared links amongst the egresses
1406 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001407 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001408 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001409 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1410 linksToEnforce)));
1411 return availablePaths;
1412 }
1413
1414 /**
1415 * Gets path from src to dst computed using the custom link weigher.
1416 *
1417 * @param src source device ID
1418 * @param dst destination device ID
1419 * @return list of paths from src to dst
1420 */
1421 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001422 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001423 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001424 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
Pier1f87aca2018-03-14 16:47:32 -07001425 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1426 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001427 }
1428
Charles Chanc91c8782016-03-30 17:54:24 -07001429 /**
1430 * Gets a path from src to dst.
1431 * If a path was allocated before, returns the allocated path.
1432 * Otherwise, randomly pick one from available paths.
1433 *
1434 * @param src source device ID
1435 * @param dst destination device ID
1436 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001437 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001438 * @return an optional path from src to dst
1439 */
Piere99511d2018-04-19 16:47:06 +02001440 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1441 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001442 if (allPaths == null) {
1443 allPaths = getPaths(src, dst, Collections.emptySet());
1444 }
Charles Chanc91c8782016-03-30 17:54:24 -07001445 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001446 return Optional.empty();
1447 }
Piere99511d2018-04-19 16:47:06 +02001448 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001449 // a path in the list associated to the index 1 shares only the
1450 // first hop and it is less suitable of a path belonging to the index
1451 // 2 that shares leaf-spine.
1452 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001453 int nhop;
1454 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001455 PortNumber srcPort;
1456 Set<PortNumber> existingPorts;
1457 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001458 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001459 if (!src.equals(path.links().get(0).src().deviceId())) {
1460 continue;
1461 }
1462 nhop = 0;
1463 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001464 for (Link hop : path.links()) {
1465 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1466 source : null);
1467 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1468 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001469 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001470 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001471 }
Pier Luigi91573e12018-01-23 16:06:38 +01001472 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001473 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001474 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001475 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001476 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001477 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001478 }
1479 nhop++;
1480 }
1481 // n_hop defines the index
1482 if (nhop > 0) {
1483 eligiblePaths.compute(nhop, (index, paths) -> {
1484 paths = paths == null ? Lists.newArrayList() : paths;
1485 paths.add(path);
1486 return paths;
1487 });
Charles Chanc91c8782016-03-30 17:54:24 -07001488 }
1489 }
Pier Luigi91573e12018-01-23 16:06:38 +01001490 if (eligiblePaths.isEmpty()) {
1491 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001492 Collections.shuffle(allPaths);
1493 return allPaths.stream().findFirst();
1494 }
Pier Luigi91573e12018-01-23 16:06:38 +01001495 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001496 Integer bestIndex = eligiblePaths.keySet().stream()
1497 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001498 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1499 log.debug("{} eligiblePath(s) found from {} to {}",
1500 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001501 Collections.shuffle(bestPaths);
1502 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001503 }
1504
1505 /**
Piere99511d2018-04-19 16:47:06 +02001506 * Gets device(s) of given role and of given source in given multicast tree.
1507 *
1508 * @param mcastIp multicast IP
1509 * @param role multicast role
1510 * @param source source connect point
1511 * @return set of device ID or empty set if not found
1512 */
1513 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1514 return mcastRoleStore.entrySet().stream()
1515 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001516 entry.getKey().source().equals(source) &&
1517 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001518 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1519 }
1520
1521 /**
Charles Chan72779502016-04-23 17:36:10 -07001522 * Gets device(s) of given role in given multicast group.
1523 *
1524 * @param mcastIp multicast IP
1525 * @param role multicast role
1526 * @return set of device ID or empty set if not found
1527 */
1528 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1529 return mcastRoleStore.entrySet().stream()
1530 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1531 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001532 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1533 }
1534
1535 /**
1536 * Gets source(s) of given role, given device in given multicast group.
1537 *
1538 * @param mcastIp multicast IP
1539 * @param deviceId device id
1540 * @param role multicast role
1541 * @return set of device ID or empty set if not found
1542 */
1543 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1544 return mcastRoleStore.entrySet().stream()
1545 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1546 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1547 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1548 }
1549
1550 /**
1551 * Gets source(s) of given multicast group.
1552 *
1553 * @param mcastIp multicast IP
1554 * @return set of device ID or empty set if not found
1555 */
1556 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1557 return mcastRoleStore.entrySet().stream()
1558 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1559 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001560 }
1561
1562 /**
1563 * Gets groups which is affected by the link down event.
1564 *
1565 * @param link link going down
1566 * @return a set of multicast IpAddress
1567 */
1568 private Set<IpAddress> getAffectedGroups(Link link) {
1569 DeviceId deviceId = link.src().deviceId();
1570 PortNumber port = link.src().port();
1571 return mcastNextObjStore.entrySet().stream()
1572 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001573 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1574 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001575 }
1576
1577 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001578 * Gets groups which are affected by the device down event.
1579 *
1580 * @param deviceId device going down
1581 * @return a set of multicast IpAddress
1582 */
1583 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1584 return mcastNextObjStore.entrySet().stream()
1585 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001586 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001587 .collect(Collectors.toSet());
1588 }
1589
1590 /**
Charles Chan72779502016-04-23 17:36:10 -07001591 * Gets the spine-facing port on ingress device of given multicast group.
1592 *
1593 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001594 * @param ingressDevice the ingress device
1595 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001596 * @return spine-facing port on ingress device
1597 */
Charles Chanba59dd62018-05-10 22:19:49 +00001598 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1599 ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001600 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001601 if (ingressDevice != null) {
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001602 Versioned<NextObjective> nextObjVers = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1603 mcastUtils.assignedVlan(source)));
1604 if (nextObjVers == null) {
1605 log.warn("Absent next objective for {}", new McastStoreKey(mcastIp, ingressDevice,
1606 mcastUtils.assignedVlan(source)));
1607 return portBuilder.build();
1608 }
1609 NextObjective nextObj = nextObjVers.value();
Pier7b657162018-03-27 11:29:42 -07001610 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001611 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001612 for (PortNumber port : ports) {
1613 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001614 if (srManager.deviceConfiguration() != null &&
1615 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001616 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001617 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001618 }
1619 }
1620 }
Pier1a7e0c02018-03-12 15:00:54 -07001621 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001622 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001623
1624 /**
Pier28164682018-04-17 15:50:43 +02001625 * Verify if a given connect point is sink for this group.
1626 *
1627 * @param mcastIp group address
1628 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001629 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001630 * @return true if the connect point is sink of the group
1631 */
Charles Chanba59dd62018-05-10 22:19:49 +00001632 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1633 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001634 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1635 source : null);
1636 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001637 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1638 return false;
1639 }
Pier28164682018-04-17 15:50:43 +02001640 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1641 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1642 }
1643
1644 /**
Piere99511d2018-04-19 16:47:06 +02001645 * Verify if a given connect point is sink for this group and for this source.
1646 *
1647 * @param mcastIp group address
1648 * @param connectPoint connect point to be verified
1649 * @param source source connect point
1650 * @return true if the connect point is sink of the group
1651 */
Charles Chanba59dd62018-05-10 22:19:49 +00001652 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1653 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001654 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1655 DeviceId device;
1656 if (connectPoint.deviceId().equals(source.deviceId())) {
1657 device = getDevice(mcastIp, INGRESS, source).stream()
1658 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1659 .findFirst().orElse(null);
1660 } else {
1661 device = getDevice(mcastIp, EGRESS, source).stream()
1662 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1663 .findFirst().orElse(null);
1664 }
1665 return isSink && device != null;
1666 }
1667
1668 /**
1669 * Verify if a sink is reachable from this source.
1670 *
1671 * @param mcastIp group address
1672 * @param sink connect point to be verified
1673 * @param source source connect point
1674 * @return true if the connect point is reachable from the source
1675 */
Charles Chanba59dd62018-05-10 22:19:49 +00001676 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1677 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001678 return sink.deviceId().equals(source.deviceId()) ||
1679 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1680 }
1681
1682 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001683 * Updates filtering objective for given device and port.
1684 * It is called in general when the mcast config has been
1685 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001686 *
1687 * @param deviceId device ID
1688 * @param portNum ingress port number
1689 * @param vlanId assigned VLAN ID
1690 * @param install true to add, false to remove
1691 */
Charles Chanba59dd62018-05-10 22:19:49 +00001692 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1693 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001694 lastMcastChange = Instant.now();
1695 mcastLock();
1696 try {
Piere99511d2018-04-19 16:47:06 +02001697 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigi35dab3f2018-01-25 16:16:02 +01001698 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001699 log.debug("Update filter for {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +02001700 if (!mcastUtils.isLeader(mcastRoute.group())) {
1701 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1702 return;
1703 }
Piere99511d2018-04-19 16:47:06 +02001704 // Get the sources and for each one update properly the filtering objectives
1705 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1706 sources.forEach(source -> {
1707 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1708 if (install) {
1709 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
1710 } else {
1711 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1712 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001713 }
Piere99511d2018-04-19 16:47:06 +02001714 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001715 });
1716 } finally {
1717 mcastUnlock();
1718 }
1719 }
1720
1721 /**
1722 * Performs bucket verification operation for all mcast groups in the devices.
1723 * Firstly, it verifies that mcast is stable before trying verification operation.
1724 * Verification consists in creating new nexts with VERIFY operation. Actually,
1725 * the operation is totally delegated to the driver.
1726 */
Piere99511d2018-04-19 16:47:06 +02001727 private final class McastBucketCorrector implements Runnable {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001728
1729 @Override
1730 public void run() {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001731 if (!isMcastStable()) {
1732 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001733 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001734 mcastLock();
1735 try {
1736 // Iterates over the routes and verify the related next objectives
1737 srManager.multicastRouteService.getRoutes()
Piere99511d2018-04-19 16:47:06 +02001738 .stream().map(McastRoute::group)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001739 .forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +02001740 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1741 // Verify leadership on the operation
1742 if (!mcastUtils.isLeader(mcastIp)) {
1743 log.trace("Skip {} due to lack of leadership", mcastIp);
1744 return;
1745 }
1746 // Get sources and sinks from Mcast Route Service and warn about errors
1747 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier7b657162018-03-27 11:29:42 -07001748 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
Piere99511d2018-04-19 16:47:06 +02001749 .flatMap(Collection::stream).collect(Collectors.toSet());
1750 // Do not proceed if sources of this group are missing
1751 if (sources.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001752 if (!sinks.isEmpty()) {
1753 log.warn("Unable to run buckets corrector. " +
Piere99511d2018-04-19 16:47:06 +02001754 "Missing source {} for group {}", sources, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001755 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001756 return;
1757 }
Piere99511d2018-04-19 16:47:06 +02001758 sources.forEach(source -> {
1759 // For each group we get current information in the store
1760 // and issue a check of the next objectives in place
1761 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1762 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1763 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1764 // Do not proceed if ingress devices are missing
1765 if (ingressDevices.isEmpty()) {
1766 if (!sinks.isEmpty()) {
1767 log.warn("Unable to run buckets corrector. " +
1768 "Missing ingress {} for source {} and for group {}",
1769 ingressDevices, source, mcastIp);
1770 }
1771 return;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001772 }
Piere99511d2018-04-19 16:47:06 +02001773 // Create the set of the devices to be processed
1774 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1775 if (!ingressDevices.isEmpty()) {
1776 devicesBuilder.addAll(ingressDevices);
1777 }
1778 if (!transitDevices.isEmpty()) {
1779 devicesBuilder.addAll(transitDevices);
1780 }
1781 if (!egressDevices.isEmpty()) {
1782 devicesBuilder.addAll(egressDevices);
1783 }
1784 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1785 devicesToProcess.forEach(deviceId -> {
1786 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1787 source : null);
1788 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1789 if (mcastNextObjStore.containsKey(currentKey)) {
1790 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1791 // Rebuild the next objective using assigned vlan
1792 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1793 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
1794 // Send to the flowobjective service
1795 srManager.flowObjectiveService.next(deviceId, currentNext);
1796 } else {
1797 log.warn("Unable to run buckets corrector. " +
1798 "Missing next for {}, for source {} and for group {}",
1799 deviceId, source, mcastIp);
1800 }
1801 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001802 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001803 });
1804 } finally {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001805 mcastUnlock();
1806 }
1807
1808 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001809 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001810
Piere99511d2018-04-19 16:47:06 +02001811 /**
1812 * Returns the associated next ids to the mcast groups or to the single
1813 * group if mcastIp is present.
1814 *
1815 * @param mcastIp the group ip
1816 * @return the mapping mcastIp-device to next id
1817 */
Pier Luigi0f9635b2018-01-15 18:06:43 +01001818 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001819 if (mcastIp != null) {
1820 return mcastNextObjStore.entrySet().stream()
1821 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001822 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001823 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001824 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001825 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001826 }
1827
Pier71c55772018-04-17 17:25:22 +02001828 /**
1829 * Returns the associated roles to the mcast groups or to the single
1830 * group if mcastIp is present.
1831 *
1832 * @param mcastIp the group ip
1833 * @return the mapping mcastIp-device to mcast role
1834 *
1835 * @deprecated in 1.12 ("Magpie") release.
1836 */
1837 @Deprecated
Pier Luigi69f774d2018-02-28 12:10:50 +01001838 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001839 if (mcastIp != null) {
1840 return mcastRoleStore.entrySet().stream()
1841 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001842 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1843 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001844 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001845 return mcastRoleStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001846 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1847 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001848 }
1849
Pier71c55772018-04-17 17:25:22 +02001850 /**
Piere99511d2018-04-19 16:47:06 +02001851 * Returns the associated roles to the mcast groups.
1852 *
1853 * @param mcastIp the group ip
1854 * @param sourcecp the source connect point
1855 * @return the mapping mcastIp-device to mcast role
1856 */
Charles Chanba59dd62018-05-10 22:19:49 +00001857 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1858 ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02001859 if (mcastIp != null) {
1860 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1861 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1862 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1863 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1864 if (sourcecp != null) {
1865 roles = roles.entrySet().stream()
1866 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1867 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1868 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1869 }
1870 return roles;
1871 }
1872 return mcastRoleStore.entrySet().stream()
1873 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1874 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1875 }
1876
1877
1878 /**
Pier71c55772018-04-17 17:25:22 +02001879 * Returns the associated paths to the mcast group.
1880 *
1881 * @param mcastIp the group ip
1882 * @return the mapping egress point to mcast path
1883 *
1884 * @deprecated in 1.12 ("Magpie") release.
1885 */
1886 @Deprecated
Pier Luigi0f9635b2018-01-15 18:06:43 +01001887 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1888 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001889 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001890 if (source != null) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001891 Set<DeviceId> visited = Sets.newHashSet();
Piere99511d2018-04-19 16:47:06 +02001892 List<ConnectPoint> currentPath = Lists.newArrayList(source);
1893 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001894 }
1895 return mcastPaths;
1896 }
1897
Pier71c55772018-04-17 17:25:22 +02001898 /**
1899 * Returns the associated trees to the mcast group.
1900 *
1901 * @param mcastIp the group ip
1902 * @param sourcecp the source connect point
1903 * @return the mapping egress point to mcast path
1904 */
Charles Chanba59dd62018-05-10 22:19:49 +00001905 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1906 ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02001907 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02001908 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02001909 if (sourcecp != null) {
1910 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02001911 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02001912 }
Pier71c55772018-04-17 17:25:22 +02001913 if (!sources.isEmpty()) {
1914 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02001915 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1916 Set<DeviceId> visited = Sets.newHashSet();
1917 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Piere99511d2018-04-19 16:47:06 +02001918 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02001919 mcastPaths.forEach(mcastTrees::put);
1920 });
1921 }
1922 return mcastTrees;
1923 }
1924
1925 /**
1926 * Build recursively the mcast paths.
1927 *
1928 * @param toVisit the node to visit
1929 * @param visited the visited nodes
1930 * @param mcastPaths the current mcast paths
1931 * @param currentPath the current path
1932 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001933 * @param source the source
Pier71c55772018-04-17 17:25:22 +02001934 */
Charles Chanba59dd62018-05-10 22:19:49 +00001935 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1936 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1937 List<ConnectPoint> currentPath, IpAddress mcastIp,
1938 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001939 // If we have visited the node to visit there is a loop
Pier Luigi0f9635b2018-01-15 18:06:43 +01001940 if (visited.contains(toVisit)) {
1941 return;
1942 }
1943 // Visit next-hop
1944 visited.add(toVisit);
Piere99511d2018-04-19 16:47:06 +02001945 VlanId assignedVlan = mcastUtils.assignedVlan(toVisit.equals(source.deviceId()) ? source : null);
1946 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit, assignedVlan);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001947 // Looking for next-hops
1948 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001949 // Build egress connect points, get ports and build relative cps
Pier Luigi0f9635b2018-01-15 18:06:43 +01001950 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001951 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001952 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1953 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1954 Set<ConnectPoint> egressPoints = cpBuilder.build();
Pier Luigi0f9635b2018-01-15 18:06:43 +01001955 Set<Link> egressLinks;
1956 List<ConnectPoint> newCurrentPath;
1957 Set<DeviceId> newVisited;
1958 DeviceId newToVisit;
1959 for (ConnectPoint egressPoint : egressPoints) {
1960 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1961 // If it does not have egress links, stop
1962 if (egressLinks.isEmpty()) {
1963 // Add the connect points to the path
1964 newCurrentPath = Lists.newArrayList(currentPath);
1965 newCurrentPath.add(0, egressPoint);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001966 mcastPaths.put(egressPoint, newCurrentPath);
1967 } else {
1968 newVisited = Sets.newHashSet(visited);
1969 // Iterate over the egress links for the next hops
1970 for (Link egressLink : egressLinks) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001971 newToVisit = egressLink.dst().deviceId();
Pier Luigi0f9635b2018-01-15 18:06:43 +01001972 newCurrentPath = Lists.newArrayList(currentPath);
1973 newCurrentPath.add(0, egressPoint);
1974 newCurrentPath.add(0, egressLink.dst());
Piere99511d2018-04-19 16:47:06 +02001975 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp, source);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001976 }
1977 }
1978 }
1979 }
1980 }
1981
Pierdb27b8d2018-04-17 16:29:56 +02001982 /**
1983 * Return the leaders of the mcast groups.
1984 *
1985 * @param mcastIp the group ip
1986 * @return the mapping group-node
1987 */
1988 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1989 return mcastUtils.getMcastLeaders(mcastIp);
1990 }
Charles Chanc91c8782016-03-30 17:54:24 -07001991}