blob: 34d7dbd6d9bc7be7d990dac8e97632b90b327034 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigid29ca7c2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Charles Chanc91c8782016-03-30 17:54:24 -070041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DeviceId;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020043import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070044import org.onosproject.net.Link;
45import org.onosproject.net.Path;
46import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070051import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010052import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070054import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010055import org.onosproject.segmentrouting.SegmentRoutingManager;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020056import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Charles Chanc91c8782016-03-30 17:54:24 -070057import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.ConsistentMap;
59import org.onosproject.store.service.Serializer;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigi35dab3f2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
74import java.util.concurrent.locks.Lock;
75import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070076import java.util.stream.Collectors;
77
Pier Luigi35dab3f2018-01-25 16:16:02 +010078import static java.util.concurrent.Executors.newScheduledThreadPool;
79import static org.onlab.util.Tools.groupedThreads;
Pierdb27b8d2018-04-17 16:29:56 +020080import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pierdb27b8d2018-04-17 16:29:56 +020082import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
83import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020084import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
85import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Pier979e61a2018-03-07 11:42:50 +010086import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
87import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
88import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070089
90/**
Pier Luigi69f774d2018-02-28 12:10:50 +010091 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070092 */
Charles Chan1eaf4802016-04-18 13:44:03 -070093public class McastHandler {
94 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070095 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070096 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +020097 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -070098 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +020099 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Charles Chan72779502016-04-23 17:36:10 -0700100
Pier Luigid29ca7c2018-02-28 17:24:03 +0100101 // Wait time for the cache
102 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700103
Piere99511d2018-04-19 16:47:06 +0200104 //The mcastEventCache is implemented to avoid race condition by giving more time
105 // to the underlying subsystems to process previous calls.
Pier Luigid29ca7c2018-02-28 17:24:03 +0100106 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
107 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
108 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100109 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700110 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100111 McastEvent mcastEvent = notification.getValue();
112 RemovalCause cause = notification.getCause();
113 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
114 mcastIp, sink, mcastEvent, cause);
Piere99511d2018-04-19 16:47:06 +0200115 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigid29ca7c2018-02-28 17:24:03 +0100116 switch (notification.getCause()) {
117 case REPLACED:
118 case EXPIRED:
119 dequeueMcastEvent(mcastEvent);
120 break;
121 default:
122 break;
123 }
124 }).build();
125
126 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier1f87aca2018-03-14 16:47:32 -0700127 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700128 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
129 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700130 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700131 if (mcastEvent.type() == SOURCES_ADDED ||
132 mcastEvent.type() == SOURCES_REMOVED) {
Piere99511d2018-04-19 16:47:06 +0200133 // Current subject and prev just differ for the source connect points
134 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pier7b657162018-03-27 11:29:42 -0700135 } else if (mcastEvent.type() == SINKS_ADDED) {
Pier7b657162018-03-27 11:29:42 -0700136 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
137 // Get the previous locations and verify if there are changes
138 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
139 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
140 prevConnectPoints : Collections.emptySet());
141 if (!changes.isEmpty()) {
142 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700143 }
Pier7b657162018-03-27 11:29:42 -0700144 }));
145 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pier7b657162018-03-27 11:29:42 -0700146 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
147 // Get the current locations and verify if there are changes
148 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
149 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
150 currentConnectPoints : Collections.emptySet());
151 if (!changes.isEmpty()) {
152 sinksBuilder.add(hostId);
153 }
154 }));
155 } else if (mcastEvent.type() == ROUTE_REMOVED) {
156 // Current subject is null, just take the previous host ids
157 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100158 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100159 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700160 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100161 mcastEventCache.put(cacheKey, mcastEvent);
162 });
163 }
164
165 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700166 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
167 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier7b657162018-03-27 11:29:42 -0700168 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700169 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Piere99511d2018-04-19 16:47:06 +0200170 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
171 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
172 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
173 Set<ConnectPoint> sources;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100174 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700175 case SOURCES_ADDED:
Piere99511d2018-04-19 16:47:06 +0200176 sources = mcastUpdate.sources()
177 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
178 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
179 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100180 break;
Pier1f87aca2018-03-14 16:47:32 -0700181 case SOURCES_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200182 sources = mcastUpdate.sources()
183 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
184 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
185 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100186 break;
187 case ROUTE_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200188 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100189 break;
Pier1f87aca2018-03-14 16:47:32 -0700190 case SINKS_ADDED:
Piere99511d2018-04-19 16:47:06 +0200191 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100192 break;
Pier1f87aca2018-03-14 16:47:32 -0700193 case SINKS_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200194 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100195 break;
196 default:
197 break;
198 }
199 }
200
Pier Luigi35dab3f2018-01-25 16:16:02 +0100201 // Mcast lock to serialize local operations
202 private final Lock mcastLock = new ReentrantLock();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100203 private void mcastLock() {
204 mcastLock.lock();
205 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100206 private void mcastUnlock() {
207 mcastLock.unlock();
208 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100209 // Stability threshold for Mcast. Seconds
210 private static final long MCAST_STABLITY_THRESHOLD = 5;
211 // Last change done
212 private Instant lastMcastChange = Instant.now();
213
214 /**
215 * Determines if mcast in the network has been stable in the last
216 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
217 * to the last mcast change timestamp.
218 *
219 * @return true if stable
220 */
221 private boolean isMcastStable() {
222 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
223 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800224 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100225 return (now - last) > MCAST_STABLITY_THRESHOLD;
226 }
227
Piere99511d2018-04-19 16:47:06 +0200228 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100229 private static final long MCAST_VERIFY_INTERVAL = 30;
Piere99511d2018-04-19 16:47:06 +0200230 // Executor for mcast bucket corrector and for cache
Pier Luigi35dab3f2018-01-25 16:16:02 +0100231 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100232 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100233
Charles Chan72779502016-04-23 17:36:10 -0700234 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700235 * Constructs the McastEventHandler.
236 *
237 * @param srManager Segment Routing manager
238 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700239 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700240 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700241 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700242 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700243 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700244 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200245 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700246 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700247 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700248 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700249 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700250 .build();
Piere99511d2018-04-19 16:47:06 +0200251 mcastKryo = new KryoNamespace.Builder()
252 .register(KryoNamespaces.API)
253 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
254 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700255 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200256 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700257 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700258 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700259 .build();
Pier7b657162018-03-27 11:29:42 -0700260 mcastUtils = new McastUtils(srManager, coreAppId, log);
Piere99511d2018-04-19 16:47:06 +0200261 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigi35dab3f2018-01-25 16:16:02 +0100262 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700263 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100264 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
265 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700266 }
267
268 /**
Piere99511d2018-04-19 16:47:06 +0200269 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700270 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100271 public void init() {
Pier7b657162018-03-27 11:29:42 -0700272 lastMcastChange = Instant.now();
273 mcastLock();
274 try {
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200275 // Installing rules to drop any multicast traffic for which a tree is not programmed.
276 srManager.deviceService.getAvailableDevices().forEach(device -> {
277 log.debug("Programming mcast drop flows");
278 dropUnprogrammedTrees(device.id());
279 });
Pier7b657162018-03-27 11:29:42 -0700280 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Piere99511d2018-04-19 16:47:06 +0200281 log.debug("Init group {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +0200282 if (!mcastUtils.isLeader(mcastRoute.group())) {
283 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
284 return;
285 }
Pier7b657162018-03-27 11:29:42 -0700286 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Piere99511d2018-04-19 16:47:06 +0200287 // For each source process the mcast tree
288 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
289 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
290 Set<DeviceId> visited = Sets.newHashSet();
291 List<ConnectPoint> currentPath = Lists.newArrayList(source);
292 buildMcastPaths(source.deviceId(), visited, mcastPaths,
293 currentPath, mcastRoute.group(), source);
294 // Get all the sinks and process them
295 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
296 mcastRouteData.sinks());
297 // Filter out all the working sinks, we do not want to move them
298 // TODO we need a better way to distinguish flows coming from different sources
299 sinks = sinks.stream()
300 .filter(sink -> !mcastPaths.containsKey(sink) ||
301 !isSinkForSource(mcastRoute.group(), sink, source))
302 .collect(Collectors.toSet());
303 if (sinks.isEmpty()) {
304 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
305 return;
306 }
307 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
308 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
309 mcastRoute.group(), paths));
310 });
Pier7b657162018-03-27 11:29:42 -0700311 });
312 } finally {
313 mcastUnlock();
314 }
Charles Chanc91c8782016-03-30 17:54:24 -0700315 }
316
317 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100318 * Clean up when deactivating the application.
319 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100320 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200321 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100322 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200323 mcastNextObjStore.destroy();
324 mcastRoleStore.destroy();
325 mcastUtils.terminate();
326 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100327 }
328
329 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100330 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200331 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700332 *
333 * @param event McastEvent with SOURCE_ADDED type
334 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100335 public void processMcastEvent(McastEvent event) {
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200336 log.debug("process {}", event);
Pierdb27b8d2018-04-17 16:29:56 +0200337 // If it is a route added, we do not enqueue
338 if (event.type() == ROUTE_ADDED) {
Pierdb27b8d2018-04-17 16:29:56 +0200339 processRouteAddedInternal(event.subject().route().group());
340 } else {
Pierdb27b8d2018-04-17 16:29:56 +0200341 enqueueMcastEvent(event);
342 }
Pier Luigi6786b922018-02-02 16:19:11 +0100343 }
344
345 /**
Piere99511d2018-04-19 16:47:06 +0200346 * Process the SOURCES_ADDED event.
347 *
348 * @param sources the sources connect point
349 * @param mcastIp the group address
350 * @param sinks the sinks connect points
351 */
352 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
353 Map<HostId, Set<ConnectPoint>> sinks) {
354 lastMcastChange = Instant.now();
355 mcastLock();
356 try {
357 log.debug("Processing sources added {} for group {}", sources, mcastIp);
358 if (!mcastUtils.isLeader(mcastIp)) {
359 log.debug("Skip {} due to lack of leadership", mcastIp);
360 return;
361 }
362 sources.forEach(source -> {
363 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
364 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinksToBeAdded);
365 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
366 });
367 } finally {
368 mcastUnlock();
369 }
370 }
371
372 /**
373 * Process the SOURCES_REMOVED event.
374 *
375 * @param sourcesToBeRemoved the source connect points to be removed
376 * @param remainingSources the remainig source connect points
377 * @param mcastIp the group address
378 * @param sinks the sinks connect points
379 */
380 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
381 Set<ConnectPoint> remainingSources,
382 IpAddress mcastIp,
383 Map<HostId, Set<ConnectPoint>> sinks) {
384 lastMcastChange = Instant.now();
385 mcastLock();
386 try {
387 log.debug("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
388 if (!mcastUtils.isLeader(mcastIp)) {
389 log.debug("Skip {} due to lack of leadership", mcastIp);
390 return;
391 }
392 if (remainingSources.isEmpty()) {
393 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
394 return;
395 }
396 // Skip offline devices
397 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
398 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
399 .collect(Collectors.toSet());
400 if (candidateSources.isEmpty()) {
401 log.debug("Skip {} due to empty sources to be removed", mcastIp);
402 return;
403 }
404 Set<Link> remainingLinks = Sets.newHashSet();
405 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
406 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
407 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
408 totalSources.addAll(remainingSources);
409 // Calculate all the links used by the sources
410 totalSources.forEach(source -> {
411 Set<ConnectPoint> currentSinks = sinks.values()
412 .stream().flatMap(Collection::stream)
413 .filter(sink -> isSinkForSource(mcastIp, sink, source))
414 .collect(Collectors.toSet());
415 candidateSinks.put(source, currentSinks);
416 currentSinks.forEach(currentSink -> {
417 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
418 mcastIp, null, source);
419 if (currentPath.isPresent()) {
420 if (!candidateSources.contains(source)) {
421 remainingLinks.addAll(currentPath.get().links());
422 } else {
423 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
424 }
425 }
426 });
427 });
428 // Clean transit links
429 candidateLinks.forEach((source, currentCandidateLinks) -> {
430 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
431 .immutableCopy();
432 if (!linksToBeRemoved.isEmpty()) {
433 currentCandidateLinks.forEach(link -> {
434 DeviceId srcLink = link.src().deviceId();
435 // Remove ports only on links to be removed
436 if (linksToBeRemoved.contains(link)) {
437 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
438 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
439 source : null));
440 }
441 // Remove role on the candidate links
442 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
443 });
444 }
445 });
446 // Clean ingress and egress
447 candidateSources.forEach(source -> {
448 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
449 currentSinks.forEach(currentSink -> {
450 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
451 source : null);
452 // Sinks co-located with the source
453 if (source.deviceId().equals(currentSink.deviceId())) {
454 if (source.port().equals(currentSink.port())) {
455 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
456 mcastIp, currentSink, source);
457 return;
458 }
459 // We need to check against the other sources and if it is
460 // necessary remove the port from the device - no overlap
461 Set<VlanId> otherVlans = remainingSources.stream()
462 // Only sources co-located and having this sink
463 .filter(remainingSource -> remainingSource.deviceId()
464 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
465 .contains(currentSink))
466 .map(remainingSource -> mcastUtils.assignedVlan(
467 remainingSource.deviceId().equals(currentSink.deviceId()) ?
468 remainingSource : null)).collect(Collectors.toSet());
469 if (!otherVlans.contains(assignedVlan)) {
470 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
471 mcastIp, assignedVlan);
472 }
473 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
474 source));
475 return;
476 }
477 Set<VlanId> otherVlans = remainingSources.stream()
478 .filter(remainingSource -> candidateSinks.get(remainingSource)
479 .contains(currentSink))
480 .map(remainingSource -> mcastUtils.assignedVlan(
481 remainingSource.deviceId().equals(currentSink.deviceId()) ?
482 remainingSource : null)).collect(Collectors.toSet());
483 // Sinks on other leaves
484 if (!otherVlans.contains(assignedVlan)) {
485 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
486 mcastIp, assignedVlan);
487 }
488 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
489 source));
490 });
491 });
492 } finally {
493 mcastUnlock();
494 }
495 }
496
497 /**
Pierdb27b8d2018-04-17 16:29:56 +0200498 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100499 *
Pierdb27b8d2018-04-17 16:29:56 +0200500 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100501 */
Pierdb27b8d2018-04-17 16:29:56 +0200502 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100503 lastMcastChange = Instant.now();
504 mcastLock();
505 try {
Pierdb27b8d2018-04-17 16:29:56 +0200506 log.debug("Processing route added for group {}", mcastIp);
507 // Just elect a new leader
508 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100509 } finally {
510 mcastUnlock();
511 }
512 }
513
514 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100515 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200516 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100517 * @param mcastIp multicast group IP address
518 */
Piere99511d2018-04-19 16:47:06 +0200519 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi6786b922018-02-02 16:19:11 +0100520 lastMcastChange = Instant.now();
521 mcastLock();
522 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100523 log.debug("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200524 if (!mcastUtils.isLeader(mcastIp)) {
525 log.debug("Skip {} due to lack of leadership", mcastIp);
526 mcastUtils.withdrawLeader(mcastIp);
527 return;
528 }
Piere99511d2018-04-19 16:47:06 +0200529 sources.forEach(source -> {
530 // Find out the ingress, transit and egress device of the affected group
531 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
532 .stream().findFirst().orElse(null);
533 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
534 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
535 // If there are no egress and transit devices, sinks could be only on the ingress
536 if (!egressDevices.isEmpty()) {
537 egressDevices.forEach(deviceId -> {
538 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
539 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
540 });
541 }
542 if (!transitDevices.isEmpty()) {
543 transitDevices.forEach(deviceId -> {
544 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
545 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
546 });
547 }
548 if (ingressDevice != null) {
549 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
550 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
551 }
552 });
553 // Finally, withdraw the leadership
554 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100555 } finally {
556 mcastUnlock();
557 }
558 }
559
Pier7b657162018-03-27 11:29:42 -0700560 /**
561 * Process sinks to be removed.
562 *
Piere99511d2018-04-19 16:47:06 +0200563 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700564 * @param mcastIp the ip address of the group
565 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200566 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700567 */
Piere99511d2018-04-19 16:47:06 +0200568 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700569 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200570 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700571 lastMcastChange = Instant.now();
572 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700573 try {
Pierdb27b8d2018-04-17 16:29:56 +0200574 if (!mcastUtils.isLeader(mcastIp)) {
575 log.debug("Skip {} due to lack of leadership", mcastIp);
576 return;
577 }
Piere99511d2018-04-19 16:47:06 +0200578 log.debug("Processing sinks removed for group {} and for sources {}",
579 mcastIp, sources);
580 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
581 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
582 sources.forEach(source -> {
583 // Save the path associated to the sinks to be removed
584 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
585 newSinks, source);
586 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
587 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
588 sink.deviceId(), mcastIp,
589 null, source)));
590 treesToBeRemoved.put(source, treeToBeRemoved);
591 // Recover the dual-homed sinks
592 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
593 prevSinks, source);
594 treesToBeAdded.put(source, sinksToBeRecovered);
595 });
596 // Remove the sinks taking into account the multiple sources and the original paths
597 treesToBeRemoved.forEach((source, tree) ->
598 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
599 // Add new sinks according to the recovery procedure
600 treesToBeAdded.forEach((source, sinks) ->
601 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700602 } finally {
603 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700604 }
605 }
606
Pier Luigi6786b922018-02-02 16:19:11 +0100607 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100608 * Removes a path from source to sink for given multicast group.
609 *
610 * @param source connect point of the multicast source
611 * @param sink connection point of the multicast sink
612 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200613 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100614 */
615 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200616 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100617 lastMcastChange = Instant.now();
618 mcastLock();
619 try {
Piere99511d2018-04-19 16:47:06 +0200620 log.debug("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700621 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100622 // When source and sink are on the same device
623 if (source.deviceId().equals(sink.deviceId())) {
624 // Source and sink are on even the same port. There must be something wrong.
625 if (source.port().equals(sink.port())) {
Piere99511d2018-04-19 16:47:06 +0200626 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100627 return;
628 }
Pier7b657162018-03-27 11:29:42 -0700629 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100630 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200631 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100632 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100633 return;
634 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100635 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700636 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100637 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200638 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100639 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100640 // If this is the last sink on the device, also update upstream
Pier Luigi35dab3f2018-01-25 16:16:02 +0100641 if (mcastPath.isPresent()) {
642 List<Link> links = Lists.newArrayList(mcastPath.get().links());
643 Collections.reverse(links);
644 for (Link link : links) {
645 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200646 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
647 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi92e69be2018-03-02 12:53:37 +0100648 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200649 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100650 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100651 }
Charles Chanc91c8782016-03-30 17:54:24 -0700652 }
653 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100654 } finally {
655 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700656 }
657 }
658
Pier7b657162018-03-27 11:29:42 -0700659
660 /**
661 * Process sinks to be added.
662 *
Piere99511d2018-04-19 16:47:06 +0200663 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700664 * @param mcastIp the group IP
665 * @param newSinks the new sinks to be processed
666 * @param allPrevSinks all previous sinks
667 */
Piere99511d2018-04-19 16:47:06 +0200668 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700669 Map<HostId, Set<ConnectPoint>> newSinks,
670 Set<ConnectPoint> allPrevSinks) {
671 lastMcastChange = Instant.now();
672 mcastLock();
673 try {
Pierdb27b8d2018-04-17 16:29:56 +0200674 if (!mcastUtils.isLeader(mcastIp)) {
675 log.debug("Skip {} due to lack of leadership", mcastIp);
676 return;
677 }
Piere99511d2018-04-19 16:47:06 +0200678 log.debug("Processing sinks added for group {} and for sources {}", mcastIp, sources);
679 sources.forEach(source -> {
680 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
681 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
682 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
683 });
Pier7b657162018-03-27 11:29:42 -0700684 } finally {
685 mcastUnlock();
686 }
687 }
688
Charles Chanc91c8782016-03-30 17:54:24 -0700689 /**
690 * Establishes a path from source to sink for given multicast group.
691 *
692 * @param source connect point of the multicast source
693 * @param sink connection point of the multicast sink
694 * @param mcastIp multicast group IP address
695 */
696 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700697 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100698 lastMcastChange = Instant.now();
699 mcastLock();
700 try {
Piere99511d2018-04-19 16:47:06 +0200701 log.debug("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100702 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700703 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
Piere99511d2018-04-19 16:47:06 +0200704 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100705 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100706 if (source.port().equals(sink.port())) {
707 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
708 mcastIp, sink, source);
709 return;
710 }
Pier7b657162018-03-27 11:29:42 -0700711 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere99511d2018-04-19 16:47:06 +0200712 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100713 return;
714 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100715 // Find a path. If present, create/update groups and flows for each hop
Piere99511d2018-04-19 16:47:06 +0200716 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100717 if (mcastPath.isPresent()) {
718 List<Link> links = mcastPath.get().links();
Pier1a7e0c02018-03-12 15:00:54 -0700719 // Setup mcast role for ingress
Piere99511d2018-04-19 16:47:06 +0200720 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
721 // Setup properly the transit forwarding
Pier Luigi35dab3f2018-01-25 16:16:02 +0100722 links.forEach(link -> {
723 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700724 mcastUtils.assignedVlan(link.src().deviceId()
725 .equals(source.deviceId()) ? source : null));
726 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
727 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100728 });
Pier1a7e0c02018-03-12 15:00:54 -0700729 // Setup mcast role for the transit
730 links.stream()
731 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Piere99511d2018-04-19 16:47:06 +0200732 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
733 source), TRANSIT));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100734 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700735 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700736 // Setup mcast role for egress
Piere99511d2018-04-19 16:47:06 +0200737 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100738 } else {
Piere99511d2018-04-19 16:47:06 +0200739 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100740 }
741 } finally {
742 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700743 }
744 }
745
746 /**
Charles Chan72779502016-04-23 17:36:10 -0700747 * Processes the LINK_DOWN event.
748 *
749 * @param affectedLink Link that is going down
750 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100751 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100752 lastMcastChange = Instant.now();
753 mcastLock();
754 try {
755 // Get groups affected by the link down event
756 getAffectedGroups(affectedLink).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200757 log.debug("Processing link down {} for group {}", affectedLink, mcastIp);
758 recoverFailure(mcastIp, affectedLink);
Charles Chan72779502016-04-23 17:36:10 -0700759 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100760 } finally {
761 mcastUnlock();
762 }
Charles Chan72779502016-04-23 17:36:10 -0700763 }
764
765 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100766 * Process the DEVICE_DOWN event.
767 *
768 * @param deviceDown device going down
769 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100770 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100771 lastMcastChange = Instant.now();
772 mcastLock();
773 try {
774 // Get the mcast groups affected by the device going down
775 getAffectedGroups(deviceDown).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200776 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
777 recoverFailure(mcastIp, deviceDown);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100778 });
779 } finally {
780 mcastUnlock();
781 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100782 }
783
784 /**
Piere99511d2018-04-19 16:47:06 +0200785 * General failure recovery procedure.
786 *
787 * @param mcastIp the group to recover
788 * @param failedElement the failed element
789 */
790 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
791 // TODO Optimize when the group editing is in place
792 if (!mcastUtils.isLeader(mcastIp)) {
793 log.debug("Skip {} due to lack of leadership", mcastIp);
794 return;
795 }
796 // Do not proceed if the sources of this group are missing
797 Set<ConnectPoint> sources = getSources(mcastIp);
798 if (sources.isEmpty()) {
799 log.warn("Missing sources for group {}", mcastIp);
800 return;
801 }
802 // Find out the ingress devices of the affected group
803 // If sinks are in other leafs, we have ingress, transit, egress, and source
804 // If sinks are in the same leaf, we have just ingress and source
805 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
806 if (ingressDevices.isEmpty()) {
807 log.warn("Missing ingress devices for group {}", ingressDevices, mcastIp);
808 return;
809 }
810 // For each tree, delete ingress-transit part
811 sources.forEach(source -> {
812 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
813 transitDevices.forEach(transitDevice -> {
814 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
815 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
816 });
817 });
818 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
819 // TODO Evaluate the possibility of building optimize trees between sources
820 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
821 sources.forEach(source -> {
822 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
823 DeviceId ingressDevice = ingressDevices.stream()
824 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
825 // Clean also the ingress
826 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
827 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
828 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
829 }
830 if (ingressDevice == null) {
831 log.warn("Skip failure recovery - " +
832 "Missing ingress for source {} and group {}", source, mcastIp);
833 return;
834 }
835 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
836 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
837 // We have to verify, if there are egresses without paths
838 mcastTree.forEach((egressDevice, paths) -> {
839 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
840 mcastIp, paths, source);
841 // No paths, we have to try with alternative location
842 if (!mcastPath.isPresent()) {
843 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
844 listSources = listSources == null ? Sets.newHashSet() : listSources;
845 listSources.add(source);
846 return listSources;
847 });
848 notRecoveredInternal.add(egressDevice);
849 }
850 });
851 // Fast path, we can recover all the locations
852 if (notRecoveredInternal.isEmpty()) {
853 mcastTree.forEach((egressDevice, paths) -> {
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200854 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
855 mcastPath.ifPresent(path -> installPath(mcastIp, source, path));
Piere99511d2018-04-19 16:47:06 +0200856 });
857 } else {
858 // Let's try to recover using alternative locations
859 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
860 ingressDevice, source);
861 }
862 });
863 // Finally remove the egresses not recovered
864 notRecovered.forEach((egressDevice, listSources) -> {
865 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
866 if (Objects.equal(currentSources, listSources)) {
867 log.warn("Fail to recover egress device {} from {} failure {}",
868 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
869 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
870 }
871 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
872 });
873 }
874
875 /**
Pier7b657162018-03-27 11:29:42 -0700876 * Try to recover sinks using alternate locations.
877 *
878 * @param egressDevices the original egress devices
879 * @param notRecovered the devices not recovered
880 * @param mcastIp the group address
881 * @param ingressDevice the ingress device
882 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700883 */
884 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200885 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
886 log.debug("Processing recover sinks for group {} and for source {}",
887 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700888 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700889 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700890 Set<ConnectPoint> totalSinks = Sets.newHashSet();
891 // Let's compute all the affected sinks and all the sinks
892 notRecovered.forEach(deviceId -> {
893 totalAffectedSinks.addAll(
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200894 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream().flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700895 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200896 .collect(Collectors.toSet()));
Pier7b657162018-03-27 11:29:42 -0700897 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200898 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200899 .flatMap(Collection::stream).collect(Collectors.toSet()));
Pier7b657162018-03-27 11:29:42 -0700900 });
Pier7b657162018-03-27 11:29:42 -0700901 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200902 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
903 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
904 newEgressDevices.addAll(recovered);
905 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
906 newEgressDevices = newEgressDevices.stream()
907 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
908 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700909 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200910 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700911 sinksToBeAdded.stream()
912 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
913 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
914 }
Pier7b657162018-03-27 11:29:42 -0700915 // Construct a new path for each egress device
916 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200917 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700918 if (mcastPath.isPresent()) {
919 // Using recovery procedure
920 if (recovered.contains(egressDevice)) {
921 installPath(mcastIp, source, mcastPath.get());
922 } else {
923 // otherwise we need to threat as new sink
924 sinksToBeAdded.stream()
925 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
926 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
927 }
Pier7b657162018-03-27 11:29:42 -0700928 }
929 });
Pier7b657162018-03-27 11:29:42 -0700930 }
931
932 /**
Pier28164682018-04-17 15:50:43 +0200933 * Process all the sinks related to a mcast group and return
934 * the ones to be removed.
935 *
936 * @param mcastIp the group address
937 * @param prevsinks the previous sinks to be evaluated
938 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200939 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200940 * @return the set of the sinks to be removed
941 */
942 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
943 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +0200944 Map<HostId, Set<ConnectPoint>> newSinks,
945 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +0200946 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
947 prevsinks.forEach(((hostId, connectPoints) -> {
948 // We have to check with the existing flows
949 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200950 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +0200951 .findFirst().orElse(null);
952 if (sinkToBeProcessed != null) {
953 // If the host has been removed or location has been removed
954 if (!newSinks.containsKey(hostId) ||
955 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
956 sinksToBeProcessed.add(sinkToBeProcessed);
957 }
958 }
959 }));
960 // We have done, return the set
961 return sinksToBeProcessed;
962 }
963
964 /**
Pier7b657162018-03-27 11:29:42 -0700965 * Process new locations and return the set of sinks to be added
966 * in the context of the recovery.
967 *
Pier28164682018-04-17 15:50:43 +0200968 * @param newSinks the remaining sinks
969 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +0200970 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700971 * @return the set of the sinks to be processed
972 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +0200973 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp, Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +0200974 Map<HostId, Set<ConnectPoint>> prevSinks,
975 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -0700976 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +0200977 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700978 // If it has more than 1 locations
979 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
980 log.debug("Skip {} since sink {} has {} locations",
981 mcastIp, hostId, connectPoints.size());
982 return;
983 }
Pier28164682018-04-17 15:50:43 +0200984 // If previously it had two locations, we need to recover it
985 // Filter out if the remaining location is already served
986 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +0200987 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200988 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +0200989 .findFirst().orElse(null);
990 if (sinkToBeProcessed != null) {
991 sinksToBeProcessed.add(sinkToBeProcessed);
992 }
Pier28164682018-04-17 15:50:43 +0200993 }
Pier7b657162018-03-27 11:29:42 -0700994 });
995 return sinksToBeProcessed;
996 }
997
998 /**
999 * Process all the sinks related to a mcast group and return
1000 * the ones to be processed.
1001 *
1002 * @param source the source connect point
1003 * @param mcastIp the group address
1004 * @param sinks the sinks to be evaluated
1005 * @return the set of the sinks to be processed
1006 */
1007 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1008 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001009 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1010 sinks.forEach(((hostId, connectPoints) -> {
1011 // If it has more than 2 locations
1012 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1013 log.debug("Skip {} since sink {} has {} locations",
1014 mcastIp, hostId, connectPoints.size());
1015 return;
1016 }
1017 // If it has one location, just use it
1018 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001019 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001020 return;
1021 }
1022 // We prefer to reuse existing flows
1023 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001024 .filter(connectPoint -> {
1025 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1026 return false;
1027 }
1028 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1029 return false;
1030 }
1031 ConnectPoint other = connectPoints.stream()
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001032 .filter(remaining -> !remaining.equals(connectPoint)).findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001033 // We are already serving the sink
1034 return !isSinkForSource(mcastIp, other, source);
1035 }).findFirst().orElse(null);
1036
Pier7b657162018-03-27 11:29:42 -07001037 if (sinkToBeProcessed != null) {
1038 sinksToBeProcessed.add(sinkToBeProcessed);
1039 return;
1040 }
1041 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001042 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001043 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001044 .filter(connectPoint -> {
1045 if (!egresses.contains(connectPoint.deviceId())) {
1046 return false;
1047 }
1048 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1049 return false;
1050 }
1051 ConnectPoint other = connectPoints.stream()
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001052 .filter(remaining -> !remaining.equals(connectPoint)).findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001053 return !isSinkForSource(mcastIp, other, source);
1054 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001055 if (sinkToBeProcessed != null) {
1056 sinksToBeProcessed.add(sinkToBeProcessed);
1057 return;
1058 }
1059 // Otherwise we prefer a location co-located with the source (if it exists)
1060 sinkToBeProcessed = connectPoints.stream()
1061 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1062 .findFirst().orElse(null);
1063 if (sinkToBeProcessed != null) {
1064 sinksToBeProcessed.add(sinkToBeProcessed);
1065 return;
1066 }
Piere99511d2018-04-19 16:47:06 +02001067 // Finally, we randomly pick a new location if it is reachable
1068 sinkToBeProcessed = connectPoints.stream()
1069 .filter(connectPoint -> {
1070 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1071 return false;
1072 }
1073 ConnectPoint other = connectPoints.stream()
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001074 .filter(remaining -> !remaining.equals(connectPoint)).findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001075 return !isSinkForSource(mcastIp, other, source);
1076 }).findFirst().orElse(null);
1077 if (sinkToBeProcessed != null) {
1078 sinksToBeProcessed.add(sinkToBeProcessed);
1079 }
Pier7b657162018-03-27 11:29:42 -07001080 }));
Pier7b657162018-03-27 11:29:42 -07001081 return sinksToBeProcessed;
1082 }
1083
1084 /**
Pier1a7e0c02018-03-12 15:00:54 -07001085 * Utility method to remove all the ingress transit ports.
1086 *
1087 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001088 * @param ingressDevices the ingress devices
1089 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001090 */
Piere99511d2018-04-19 16:47:06 +02001091 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1092 Set<ConnectPoint> sources) {
1093 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1094 sources.forEach(source -> {
1095 DeviceId ingressDevice = ingressDevices.stream()
1096 .filter(deviceId -> deviceId.equals(source.deviceId()))
1097 .findFirst().orElse(null);
1098 if (ingressDevice == null) {
1099 log.warn("Skip removeIngressTransitPorts - " +
1100 "Missing ingress for source {} and group {}",
1101 source, mcastIp);
1102 return;
Pier1a7e0c02018-03-12 15:00:54 -07001103 }
Piere99511d2018-04-19 16:47:06 +02001104 ingressTransitPorts.put(source, ingressTransitPort(mcastIp, ingressDevice, source));
Pier1a7e0c02018-03-12 15:00:54 -07001105 });
Piere99511d2018-04-19 16:47:06 +02001106 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1107 DeviceId ingressDevice = ingressDevices.stream()
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001108 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001109 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1110 mcastIp, mcastUtils.assignedVlan(source));
1111 if (isLast) {
1112 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1113 }
1114 }));
Pier1a7e0c02018-03-12 15:00:54 -07001115 }
1116
1117 /**
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001118 * Installs flows to drop any multicast traffic for a tree that was not programmed.
1119 *
1120 * @param deviceId the device
1121 */
1122 public void dropUnprogrammedTrees(DeviceId deviceId) {
1123 try {
1124 if (srManager.deviceConfiguration().isEdgeDevice(deviceId)) {
1125 Set<VlanId> assignedVlans = srManager.deviceService.getPorts(deviceId).stream().map(port -> {
1126 return mcastUtils.assignedVlan(new ConnectPoint(port.element().id(), port.number()));
1127 }).collect(Collectors.toSet());
1128 mcastUtils.addDropFiltersToDevice(deviceId, true, assignedVlans);
1129 mcastUtils.addDropFiltersToDevice(deviceId, false, assignedVlans);
1130 }
1131 } catch (DeviceConfigNotFoundException e) {
1132 log.warn("Not installing mcast drop flows for unprogrammed trees on {}. Absent config", deviceId);
1133 }
1134 }
1135
1136 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001137 * Adds a port to given multicast group on given device. This involves the
1138 * update of L3 multicast group and multicast routing table entry.
1139 *
1140 * @param deviceId device ID
1141 * @param port port to be added
1142 * @param mcastIp multicast group
1143 * @param assignedVlan assigned VLAN ID
1144 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001145 private void addPortToDevice(DeviceId deviceId, PortNumber port, IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001146 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001147 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001148 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001149 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001150 // First time someone request this mcast group via this device
1151 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001152 // New nextObj
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001153 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan, portBuilder.build(), null).add();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001154 // Store the new port
1155 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001156 } else {
1157 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001158 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001159 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001160 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001161 if (existingPorts.contains(port)) {
1162 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1163 return;
1164 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001165 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001166 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001167 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001168 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001169 portBuilder.build(), nextObj.id()).addToExisting();
1170 // Store the final next objective and send only the difference to the driver
1171 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1172 // Add just the new port
1173 portBuilder = ImmutableSet.builder();
1174 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001175 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001176 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001177 }
1178 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001179 ObjectiveContext context = new DefaultObjectiveContext(
1180 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1181 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001182 (objective, error) -> log.warn("Failed to add {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001183 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001184 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1185 newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -07001186 srManager.flowObjectiveService.next(deviceId, newNextObj);
1187 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001188 }
1189
1190 /**
1191 * Removes a port from given multicast group on given device.
1192 * This involves the update of L3 multicast group and multicast routing
1193 * table entry.
1194 *
1195 * @param deviceId device ID
1196 * @param port port to be added
1197 * @param mcastIp multicast group
1198 * @param assignedVlan assigned VLAN ID
1199 * @return true if this is the last sink on this device
1200 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001201 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port, IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001202 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001203 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001204 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001205 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001206 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001207 }
Charles Chan72779502016-04-23 17:36:10 -07001208 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001209 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001210 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001211 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001212 if (!existingPorts.isEmpty()) {
1213 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1214 return false;
1215 }
1216 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001217 }
1218 // Copy and modify the ImmutableSet
1219 existingPorts = Sets.newHashSet(existingPorts);
1220 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001221 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001222 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001223 ForwardingObjective fwdObj;
1224 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001225 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001226 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1227 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001228 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001229 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001230 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001231 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001232 } else {
1233 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001234 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001235 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1236 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001237 (objective, error) -> log.warn("Failed to update {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001238 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +01001239 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001240 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001241 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001242 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001243 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001244 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001245 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001246 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001247 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1248 srManager.flowObjectiveService.next(deviceId, newNextObj);
1249 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001250 return existingPorts.isEmpty();
1251 }
1252
Charles Chan72779502016-04-23 17:36:10 -07001253 /**
1254 * Removes entire group on given device.
1255 *
1256 * @param deviceId device ID
1257 * @param mcastIp multicast group to be removed
1258 * @param assignedVlan assigned VLAN ID
1259 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001260 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001261 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001262 // This device is not serving this multicast group
1263 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1264 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1265 return;
1266 }
1267 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001268 ObjectiveContext context = new DefaultObjectiveContext(
1269 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1270 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001271 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001272 mcastIp, deviceId, assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001273 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001274 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1275 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001276 }
1277
Pier Luigi580fd8a2018-01-16 10:47:50 +01001278 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001279 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001280 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001281 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001282 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001283 // For each link, modify the next on the source device adding the src port
1284 // and a new filter objective on the destination port
1285 links.forEach(link -> {
1286 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001287 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1288 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1289 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001290 });
Pier1a7e0c02018-03-12 15:00:54 -07001291 // Setup mcast role for the transit
1292 links.stream()
1293 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001294 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001295 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001296 }
1297
Charles Chanc91c8782016-03-30 17:54:24 -07001298 /**
Pier1f87aca2018-03-14 16:47:32 -07001299 * Go through all the paths, looking for shared links to be used
1300 * in the final path computation.
1301 *
1302 * @param egresses egress devices
1303 * @param availablePaths all the available paths towards the egress
1304 * @return shared links between egress devices
1305 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001306 private Set<Link> exploreMcastTree(Set<DeviceId> egresses, Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001307 int minLength = Integer.MAX_VALUE;
1308 int length;
Pier1f87aca2018-03-14 16:47:32 -07001309 List<Path> currentPaths;
1310 // Verify the source can still reach all the egresses
1311 for (DeviceId egress : egresses) {
1312 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001313 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001314 currentPaths = availablePaths.get(egress);
1315 if (currentPaths.isEmpty()) {
1316 continue;
1317 }
Piere99511d2018-04-19 16:47:06 +02001318 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001319 length = currentPaths.get(0).links().size();
1320 if (length < minLength) {
1321 minLength = length;
1322 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001323 }
Pier1f87aca2018-03-14 16:47:32 -07001324 // If there are no paths
1325 if (minLength == Integer.MAX_VALUE) {
1326 return Collections.emptySet();
1327 }
Pier1f87aca2018-03-14 16:47:32 -07001328 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001329 Set<Link> sharedLinks = Sets.newHashSet();
1330 Set<Link> currentSharedLinks;
1331 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001332 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001333 // Let's find out the shared links
1334 while (index < minLength) {
1335 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001336 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001337 currentSharedLinks = Sets.newHashSet();
1338 // Iterate over the paths and take the "index" links
1339 for (Path path : currentPaths) {
1340 currentSharedLinks.add(path.links().get(index));
1341 }
1342 // Iterate over the remaining egress
1343 for (DeviceId egress : egresses) {
1344 // Iterate over the paths and take the "index" links
1345 currentLinks = Sets.newHashSet();
1346 for (Path path : availablePaths.get(egress)) {
1347 currentLinks.add(path.links().get(index));
1348 }
1349 // Do intersection
1350 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1351 // If there are no shared paths exit and record the device to remove
1352 // we have to retry with a subset of sinks
1353 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001354 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001355 index = minLength;
1356 break;
1357 }
1358 }
1359 sharedLinks.addAll(currentSharedLinks);
1360 index++;
1361 }
Piere99511d2018-04-19 16:47:06 +02001362 // If the shared links is empty and there are egress let's retry another time with less sinks,
1363 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001364 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1365 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001366 sharedLinks = exploreMcastTree(egresses, availablePaths);
1367 }
1368 return sharedLinks;
1369 }
1370
1371 /**
1372 * Build Mcast tree having as root the given source and as leaves the given egress points.
1373 *
1374 * @param source source of the tree
1375 * @param sinks leaves of the tree
1376 * @return the computed Mcast tree
1377 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001378 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source, Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001379 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001380 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1381 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
Pier1f87aca2018-03-14 16:47:32 -07001382 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001383 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001384 // We need to put back the source if it was originally present
1385 sinks.forEach(sink -> {
1386 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1387 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1388 });
Pier1f87aca2018-03-14 16:47:32 -07001389 return finalTree;
1390 }
1391
1392 /**
1393 * Build Mcast tree having as root the given source and as leaves the given egress.
1394 *
1395 * @param source source of the tree
1396 * @param egresses leaves of the tree
1397 * @return the computed Mcast tree
1398 */
1399 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1400 Set<DeviceId> egresses) {
1401 // Pre-compute all the paths
1402 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001403 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1404 Collections.emptySet())));
1405 // Explore the topology looking for shared links amongst the egresses
1406 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001407 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001408 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001409 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1410 linksToEnforce)));
1411 return availablePaths;
1412 }
1413
1414 /**
1415 * Gets path from src to dst computed using the custom link weigher.
1416 *
1417 * @param src source device ID
1418 * @param dst destination device ID
1419 * @return list of paths from src to dst
1420 */
1421 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001422 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001423 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001424 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
Pier1f87aca2018-03-14 16:47:32 -07001425 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1426 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001427 }
1428
Charles Chanc91c8782016-03-30 17:54:24 -07001429 /**
1430 * Gets a path from src to dst.
1431 * If a path was allocated before, returns the allocated path.
1432 * Otherwise, randomly pick one from available paths.
1433 *
1434 * @param src source device ID
1435 * @param dst destination device ID
1436 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001437 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001438 * @return an optional path from src to dst
1439 */
Piere99511d2018-04-19 16:47:06 +02001440 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1441 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001442 if (allPaths == null) {
1443 allPaths = getPaths(src, dst, Collections.emptySet());
1444 }
Charles Chanc91c8782016-03-30 17:54:24 -07001445 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001446 return Optional.empty();
1447 }
Piere99511d2018-04-19 16:47:06 +02001448 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001449 // a path in the list associated to the index 1 shares only the
1450 // first hop and it is less suitable of a path belonging to the index
1451 // 2 that shares leaf-spine.
1452 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001453 int nhop;
1454 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001455 PortNumber srcPort;
1456 Set<PortNumber> existingPorts;
1457 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001458 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001459 if (!src.equals(path.links().get(0).src().deviceId())) {
1460 continue;
1461 }
1462 nhop = 0;
1463 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001464 for (Link hop : path.links()) {
1465 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1466 source : null);
1467 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1468 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001469 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001470 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001471 }
Pier Luigi91573e12018-01-23 16:06:38 +01001472 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001473 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001474 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001475 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001476 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001477 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001478 }
1479 nhop++;
1480 }
1481 // n_hop defines the index
1482 if (nhop > 0) {
1483 eligiblePaths.compute(nhop, (index, paths) -> {
1484 paths = paths == null ? Lists.newArrayList() : paths;
1485 paths.add(path);
1486 return paths;
1487 });
Charles Chanc91c8782016-03-30 17:54:24 -07001488 }
1489 }
Pier Luigi91573e12018-01-23 16:06:38 +01001490 if (eligiblePaths.isEmpty()) {
1491 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001492 Collections.shuffle(allPaths);
1493 return allPaths.stream().findFirst();
1494 }
Pier Luigi91573e12018-01-23 16:06:38 +01001495 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001496 Integer bestIndex = eligiblePaths.keySet().stream()
1497 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001498 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1499 log.debug("{} eligiblePath(s) found from {} to {}",
1500 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001501 Collections.shuffle(bestPaths);
1502 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001503 }
1504
1505 /**
Piere99511d2018-04-19 16:47:06 +02001506 * Gets device(s) of given role and of given source in given multicast tree.
1507 *
1508 * @param mcastIp multicast IP
1509 * @param role multicast role
1510 * @param source source connect point
1511 * @return set of device ID or empty set if not found
1512 */
1513 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1514 return mcastRoleStore.entrySet().stream()
1515 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001516 entry.getKey().source().equals(source) && entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001517 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1518 }
1519
1520 /**
Charles Chan72779502016-04-23 17:36:10 -07001521 * Gets device(s) of given role in given multicast group.
1522 *
1523 * @param mcastIp multicast IP
1524 * @param role multicast role
1525 * @return set of device ID or empty set if not found
1526 */
1527 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1528 return mcastRoleStore.entrySet().stream()
1529 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1530 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001531 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1532 }
1533
1534 /**
1535 * Gets source(s) of given role, given device in given multicast group.
1536 *
1537 * @param mcastIp multicast IP
1538 * @param deviceId device id
1539 * @param role multicast role
1540 * @return set of device ID or empty set if not found
1541 */
1542 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1543 return mcastRoleStore.entrySet().stream()
1544 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1545 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1546 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1547 }
1548
1549 /**
1550 * Gets source(s) of given multicast group.
1551 *
1552 * @param mcastIp multicast IP
1553 * @return set of device ID or empty set if not found
1554 */
1555 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1556 return mcastRoleStore.entrySet().stream()
1557 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1558 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001559 }
1560
1561 /**
1562 * Gets groups which is affected by the link down event.
1563 *
1564 * @param link link going down
1565 * @return a set of multicast IpAddress
1566 */
1567 private Set<IpAddress> getAffectedGroups(Link link) {
1568 DeviceId deviceId = link.src().deviceId();
1569 PortNumber port = link.src().port();
1570 return mcastNextObjStore.entrySet().stream()
1571 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001572 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1573 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001574 }
1575
1576 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001577 * Gets groups which are affected by the device down event.
1578 *
1579 * @param deviceId device going down
1580 * @return a set of multicast IpAddress
1581 */
1582 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1583 return mcastNextObjStore.entrySet().stream()
1584 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001585 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001586 .collect(Collectors.toSet());
1587 }
1588
1589 /**
Charles Chan72779502016-04-23 17:36:10 -07001590 * Gets the spine-facing port on ingress device of given multicast group.
1591 *
1592 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001593 * @param ingressDevice the ingress device
1594 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001595 * @return spine-facing port on ingress device
1596 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001597 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001598 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001599 if (ingressDevice != null) {
Piere99511d2018-04-19 16:47:06 +02001600 NextObjective nextObj = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1601 mcastUtils.assignedVlan(source))).value();
Pier7b657162018-03-27 11:29:42 -07001602 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001603 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001604 for (PortNumber port : ports) {
1605 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001606 if (srManager.deviceConfiguration() != null &&
1607 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001608 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001609 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001610 }
1611 }
1612 }
Pier1a7e0c02018-03-12 15:00:54 -07001613 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001614 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001615
1616 /**
Pier28164682018-04-17 15:50:43 +02001617 * Verify if a given connect point is sink for this group.
1618 *
1619 * @param mcastIp group address
1620 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001621 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001622 * @return true if the connect point is sink of the group
1623 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001624 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint, ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001625 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1626 source : null);
1627 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001628 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1629 return false;
1630 }
Pier28164682018-04-17 15:50:43 +02001631 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1632 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1633 }
1634
1635 /**
Piere99511d2018-04-19 16:47:06 +02001636 * Verify if a given connect point is sink for this group and for this source.
1637 *
1638 * @param mcastIp group address
1639 * @param connectPoint connect point to be verified
1640 * @param source source connect point
1641 * @return true if the connect point is sink of the group
1642 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001643 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint, ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001644 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1645 DeviceId device;
1646 if (connectPoint.deviceId().equals(source.deviceId())) {
1647 device = getDevice(mcastIp, INGRESS, source).stream()
1648 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1649 .findFirst().orElse(null);
1650 } else {
1651 device = getDevice(mcastIp, EGRESS, source).stream()
1652 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1653 .findFirst().orElse(null);
1654 }
1655 return isSink && device != null;
1656 }
1657
1658 /**
1659 * Verify if a sink is reachable from this source.
1660 *
1661 * @param mcastIp group address
1662 * @param sink connect point to be verified
1663 * @param source source connect point
1664 * @return true if the connect point is reachable from the source
1665 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001666 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink, ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001667 return sink.deviceId().equals(source.deviceId()) ||
1668 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1669 }
1670
1671 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001672 * Updates filtering objective for given device and port.
1673 * It is called in general when the mcast config has been
1674 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001675 *
1676 * @param deviceId device ID
1677 * @param portNum ingress port number
1678 * @param vlanId assigned VLAN ID
1679 * @param install true to add, false to remove
1680 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001681 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum, VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001682 lastMcastChange = Instant.now();
1683 mcastLock();
1684 try {
Piere99511d2018-04-19 16:47:06 +02001685 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigi35dab3f2018-01-25 16:16:02 +01001686 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001687 log.debug("Update filter for {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +02001688 if (!mcastUtils.isLeader(mcastRoute.group())) {
1689 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1690 return;
1691 }
Piere99511d2018-04-19 16:47:06 +02001692 // Get the sources and for each one update properly the filtering objectives
1693 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1694 sources.forEach(source -> {
1695 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1696 if (install) {
1697 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
1698 } else {
1699 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1700 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001701 }
Piere99511d2018-04-19 16:47:06 +02001702 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001703 });
1704 } finally {
1705 mcastUnlock();
1706 }
1707 }
1708
1709 /**
1710 * Performs bucket verification operation for all mcast groups in the devices.
1711 * Firstly, it verifies that mcast is stable before trying verification operation.
1712 * Verification consists in creating new nexts with VERIFY operation. Actually,
1713 * the operation is totally delegated to the driver.
1714 */
Piere99511d2018-04-19 16:47:06 +02001715 private final class McastBucketCorrector implements Runnable {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001716
1717 @Override
1718 public void run() {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001719 if (!isMcastStable()) {
1720 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001721 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001722 mcastLock();
1723 try {
1724 // Iterates over the routes and verify the related next objectives
1725 srManager.multicastRouteService.getRoutes()
Piere99511d2018-04-19 16:47:06 +02001726 .stream().map(McastRoute::group)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001727 .forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +02001728 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1729 // Verify leadership on the operation
1730 if (!mcastUtils.isLeader(mcastIp)) {
1731 log.trace("Skip {} due to lack of leadership", mcastIp);
1732 return;
1733 }
1734 // Get sources and sinks from Mcast Route Service and warn about errors
1735 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier7b657162018-03-27 11:29:42 -07001736 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
Piere99511d2018-04-19 16:47:06 +02001737 .flatMap(Collection::stream).collect(Collectors.toSet());
1738 // Do not proceed if sources of this group are missing
1739 if (sources.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001740 if (!sinks.isEmpty()) {
1741 log.warn("Unable to run buckets corrector. " +
Piere99511d2018-04-19 16:47:06 +02001742 "Missing source {} for group {}", sources, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001743 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001744 return;
1745 }
Piere99511d2018-04-19 16:47:06 +02001746 sources.forEach(source -> {
1747 // For each group we get current information in the store
1748 // and issue a check of the next objectives in place
1749 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1750 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1751 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1752 // Do not proceed if ingress devices are missing
1753 if (ingressDevices.isEmpty()) {
1754 if (!sinks.isEmpty()) {
1755 log.warn("Unable to run buckets corrector. " +
1756 "Missing ingress {} for source {} and for group {}",
1757 ingressDevices, source, mcastIp);
1758 }
1759 return;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001760 }
Piere99511d2018-04-19 16:47:06 +02001761 // Create the set of the devices to be processed
1762 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1763 if (!ingressDevices.isEmpty()) {
1764 devicesBuilder.addAll(ingressDevices);
1765 }
1766 if (!transitDevices.isEmpty()) {
1767 devicesBuilder.addAll(transitDevices);
1768 }
1769 if (!egressDevices.isEmpty()) {
1770 devicesBuilder.addAll(egressDevices);
1771 }
1772 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1773 devicesToProcess.forEach(deviceId -> {
1774 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1775 source : null);
1776 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1777 if (mcastNextObjStore.containsKey(currentKey)) {
1778 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1779 // Rebuild the next objective using assigned vlan
1780 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1781 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
1782 // Send to the flowobjective service
1783 srManager.flowObjectiveService.next(deviceId, currentNext);
1784 } else {
1785 log.warn("Unable to run buckets corrector. " +
1786 "Missing next for {}, for source {} and for group {}",
1787 deviceId, source, mcastIp);
1788 }
1789 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001790 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001791 });
1792 } finally {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001793 mcastUnlock();
1794 }
1795
1796 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001797 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001798
Piere99511d2018-04-19 16:47:06 +02001799 /**
1800 * Returns the associated next ids to the mcast groups or to the single
1801 * group if mcastIp is present.
1802 *
1803 * @param mcastIp the group ip
1804 * @return the mapping mcastIp-device to next id
1805 */
Pier Luigi0f9635b2018-01-15 18:06:43 +01001806 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001807 if (mcastIp != null) {
1808 return mcastNextObjStore.entrySet().stream()
1809 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001810 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001811 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001812 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001813 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001814 }
1815
Pier71c55772018-04-17 17:25:22 +02001816 /**
1817 * Returns the associated roles to the mcast groups or to the single
1818 * group if mcastIp is present.
1819 *
1820 * @param mcastIp the group ip
1821 * @return the mapping mcastIp-device to mcast role
1822 *
1823 * @deprecated in 1.12 ("Magpie") release.
1824 */
1825 @Deprecated
Pier Luigi69f774d2018-02-28 12:10:50 +01001826 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001827 if (mcastIp != null) {
1828 return mcastRoleStore.entrySet().stream()
1829 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001830 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1831 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001832 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001833 return mcastRoleStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001834 .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
1835 entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001836 }
1837
Pier71c55772018-04-17 17:25:22 +02001838 /**
Piere99511d2018-04-19 16:47:06 +02001839 * Returns the associated roles to the mcast groups.
1840 *
1841 * @param mcastIp the group ip
1842 * @param sourcecp the source connect point
1843 * @return the mapping mcastIp-device to mcast role
1844 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001845 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp, ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02001846 if (mcastIp != null) {
1847 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1848 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1849 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1850 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1851 if (sourcecp != null) {
1852 roles = roles.entrySet().stream()
1853 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1854 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1855 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1856 }
1857 return roles;
1858 }
1859 return mcastRoleStore.entrySet().stream()
1860 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1861 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1862 }
1863
1864
1865 /**
Pier71c55772018-04-17 17:25:22 +02001866 * Returns the associated paths to the mcast group.
1867 *
1868 * @param mcastIp the group ip
1869 * @return the mapping egress point to mcast path
1870 *
1871 * @deprecated in 1.12 ("Magpie") release.
1872 */
1873 @Deprecated
Pier Luigi0f9635b2018-01-15 18:06:43 +01001874 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1875 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001876 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001877 if (source != null) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001878 Set<DeviceId> visited = Sets.newHashSet();
Piere99511d2018-04-19 16:47:06 +02001879 List<ConnectPoint> currentPath = Lists.newArrayList(source);
1880 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001881 }
1882 return mcastPaths;
1883 }
1884
Pier71c55772018-04-17 17:25:22 +02001885 /**
1886 * Returns the associated trees to the mcast group.
1887 *
1888 * @param mcastIp the group ip
1889 * @param sourcecp the source connect point
1890 * @return the mapping egress point to mcast path
1891 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001892 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp, ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02001893 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02001894 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02001895 if (sourcecp != null) {
1896 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02001897 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02001898 }
Pier71c55772018-04-17 17:25:22 +02001899 if (!sources.isEmpty()) {
1900 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02001901 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1902 Set<DeviceId> visited = Sets.newHashSet();
1903 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Piere99511d2018-04-19 16:47:06 +02001904 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02001905 mcastPaths.forEach(mcastTrees::put);
1906 });
1907 }
1908 return mcastTrees;
1909 }
1910
1911 /**
1912 * Build recursively the mcast paths.
1913 *
1914 * @param toVisit the node to visit
1915 * @param visited the visited nodes
1916 * @param mcastPaths the current mcast paths
1917 * @param currentPath the current path
1918 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001919 * @param source the source
Pier71c55772018-04-17 17:25:22 +02001920 */
Andrea Campanellaef30d7a2018-04-27 14:44:15 +02001921 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited, Map<ConnectPoint,
1922 List<ConnectPoint>> mcastPaths, List<ConnectPoint> currentPath,
1923 IpAddress mcastIp, ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001924 // If we have visited the node to visit there is a loop
Pier Luigi0f9635b2018-01-15 18:06:43 +01001925 if (visited.contains(toVisit)) {
1926 return;
1927 }
1928 // Visit next-hop
1929 visited.add(toVisit);
Piere99511d2018-04-19 16:47:06 +02001930 VlanId assignedVlan = mcastUtils.assignedVlan(toVisit.equals(source.deviceId()) ? source : null);
1931 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit, assignedVlan);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001932 // Looking for next-hops
1933 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001934 // Build egress connect points, get ports and build relative cps
Pier Luigi0f9635b2018-01-15 18:06:43 +01001935 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001936 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001937 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1938 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1939 Set<ConnectPoint> egressPoints = cpBuilder.build();
Pier Luigi0f9635b2018-01-15 18:06:43 +01001940 Set<Link> egressLinks;
1941 List<ConnectPoint> newCurrentPath;
1942 Set<DeviceId> newVisited;
1943 DeviceId newToVisit;
1944 for (ConnectPoint egressPoint : egressPoints) {
1945 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1946 // If it does not have egress links, stop
1947 if (egressLinks.isEmpty()) {
1948 // Add the connect points to the path
1949 newCurrentPath = Lists.newArrayList(currentPath);
1950 newCurrentPath.add(0, egressPoint);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001951 mcastPaths.put(egressPoint, newCurrentPath);
1952 } else {
1953 newVisited = Sets.newHashSet(visited);
1954 // Iterate over the egress links for the next hops
1955 for (Link egressLink : egressLinks) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001956 newToVisit = egressLink.dst().deviceId();
Pier Luigi0f9635b2018-01-15 18:06:43 +01001957 newCurrentPath = Lists.newArrayList(currentPath);
1958 newCurrentPath.add(0, egressPoint);
1959 newCurrentPath.add(0, egressLink.dst());
Piere99511d2018-04-19 16:47:06 +02001960 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp, source);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001961 }
1962 }
1963 }
1964 }
1965 }
1966
Pierdb27b8d2018-04-17 16:29:56 +02001967 /**
1968 * Return the leaders of the mcast groups.
1969 *
1970 * @param mcastIp the group ip
1971 * @return the mapping group-node
1972 */
1973 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1974 return mcastUtils.getMcastLeaders(mcastIp);
1975 }
Charles Chanc91c8782016-03-30 17:54:24 -07001976}