blob: f7e9cb363aff74c8ce972d3ae7d3287c5c1bf663 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigid29ca7c2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Charles Chanba59dd62018-05-10 22:19:49 +000041import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070042import org.onosproject.net.ConnectPoint;
43import org.onosproject.net.DeviceId;
44import org.onosproject.net.Link;
45import org.onosproject.net.Path;
46import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070051import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010052import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070054import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010055import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020059import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigi35dab3f2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
pierc32ef422020-01-27 17:45:03 +010074import java.util.concurrent.atomic.AtomicInteger;
Pier Luigi35dab3f2018-01-25 16:16:02 +010075import java.util.concurrent.locks.Lock;
76import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070077import java.util.stream.Collectors;
78
Pier Luigi35dab3f2018-01-25 16:16:02 +010079import static java.util.concurrent.Executors.newScheduledThreadPool;
80import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000081
Pierdb27b8d2018-04-17 16:29:56 +020082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070083import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020084import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
85import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000086import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
87import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
88
Pier979e61a2018-03-07 11:42:50 +010089import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
91import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070092
93/**
Pier Luigi69f774d2018-02-28 12:10:50 +010094 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070095 */
Charles Chan1eaf4802016-04-18 13:44:03 -070096public class McastHandler {
97 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070098 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070099 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +0200100 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700101 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +0200102 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Charles Chan72779502016-04-23 17:36:10 -0700103
Pier Luigid29ca7c2018-02-28 17:24:03 +0100104 // Wait time for the cache
105 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700106
Piere99511d2018-04-19 16:47:06 +0200107 //The mcastEventCache is implemented to avoid race condition by giving more time
108 // to the underlying subsystems to process previous calls.
Pier Luigid29ca7c2018-02-28 17:24:03 +0100109 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
110 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
111 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100112 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700113 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100114 McastEvent mcastEvent = notification.getValue();
115 RemovalCause cause = notification.getCause();
116 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
117 mcastIp, sink, mcastEvent, cause);
Piere99511d2018-04-19 16:47:06 +0200118 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigid29ca7c2018-02-28 17:24:03 +0100119 switch (notification.getCause()) {
120 case REPLACED:
121 case EXPIRED:
122 dequeueMcastEvent(mcastEvent);
123 break;
124 default:
125 break;
126 }
127 }).build();
128
129 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier1f87aca2018-03-14 16:47:32 -0700130 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700131 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
132 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700133 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700134 if (mcastEvent.type() == SOURCES_ADDED ||
135 mcastEvent.type() == SOURCES_REMOVED) {
Piere99511d2018-04-19 16:47:06 +0200136 // Current subject and prev just differ for the source connect points
137 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pier7b657162018-03-27 11:29:42 -0700138 } else if (mcastEvent.type() == SINKS_ADDED) {
Pier7b657162018-03-27 11:29:42 -0700139 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
140 // Get the previous locations and verify if there are changes
141 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
142 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
143 prevConnectPoints : Collections.emptySet());
144 if (!changes.isEmpty()) {
145 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700146 }
Pier7b657162018-03-27 11:29:42 -0700147 }));
148 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pier7b657162018-03-27 11:29:42 -0700149 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
150 // Get the current locations and verify if there are changes
151 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
152 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
153 currentConnectPoints : Collections.emptySet());
154 if (!changes.isEmpty()) {
155 sinksBuilder.add(hostId);
156 }
157 }));
158 } else if (mcastEvent.type() == ROUTE_REMOVED) {
159 // Current subject is null, just take the previous host ids
160 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100161 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100162 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700163 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100164 mcastEventCache.put(cacheKey, mcastEvent);
165 });
166 }
167
168 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700169 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
170 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier7b657162018-03-27 11:29:42 -0700171 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700172 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Piere99511d2018-04-19 16:47:06 +0200173 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
174 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
175 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
176 Set<ConnectPoint> sources;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100177 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700178 case SOURCES_ADDED:
Piere99511d2018-04-19 16:47:06 +0200179 sources = mcastUpdate.sources()
180 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
181 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
182 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100183 break;
Pier1f87aca2018-03-14 16:47:32 -0700184 case SOURCES_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200185 sources = mcastUpdate.sources()
186 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
187 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
188 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100189 break;
190 case ROUTE_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200191 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100192 break;
Pier1f87aca2018-03-14 16:47:32 -0700193 case SINKS_ADDED:
Piere99511d2018-04-19 16:47:06 +0200194 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100195 break;
Pier1f87aca2018-03-14 16:47:32 -0700196 case SINKS_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200197 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100198 break;
199 default:
200 break;
201 }
202 }
203
Pier Luigi35dab3f2018-01-25 16:16:02 +0100204 // Mcast lock to serialize local operations
205 private final Lock mcastLock = new ReentrantLock();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100206 private void mcastLock() {
207 mcastLock.lock();
208 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100209 private void mcastUnlock() {
210 mcastLock.unlock();
211 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100212 // Stability threshold for Mcast. Seconds
213 private static final long MCAST_STABLITY_THRESHOLD = 5;
214 // Last change done
215 private Instant lastMcastChange = Instant.now();
pierc32ef422020-01-27 17:45:03 +0100216 // Last bucker corrector execution
217 private Instant lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100218
219 /**
220 * Determines if mcast in the network has been stable in the last
221 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
222 * to the last mcast change timestamp.
223 *
224 * @return true if stable
225 */
226 private boolean isMcastStable() {
227 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
228 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
pierc32ef422020-01-27 17:45:03 +0100229 log.trace("Multicast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100230 return (now - last) > MCAST_STABLITY_THRESHOLD;
231 }
232
pierc32ef422020-01-27 17:45:03 +0100233 /**
234 * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
235 * by comparing the current time with the last corrector execution.
236 *
237 * @return true if stable
238 */
239 private boolean wasBktCorrRunning() {
240 long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
241 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
242 log.trace("McastBucketCorrector executed {}s ago", now - last);
243 return (now - last) < MCAST_VERIFY_INTERVAL;
244 }
245
Piere99511d2018-04-19 16:47:06 +0200246 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100247 private static final long MCAST_VERIFY_INTERVAL = 30;
Piere99511d2018-04-19 16:47:06 +0200248 // Executor for mcast bucket corrector and for cache
Pier Luigi35dab3f2018-01-25 16:16:02 +0100249 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100250 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100251
Charles Chan72779502016-04-23 17:36:10 -0700252 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700253 * Constructs the McastEventHandler.
254 *
255 * @param srManager Segment Routing manager
256 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700257 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700258 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700259 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700260 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700261 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700262 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200263 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700264 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700265 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700266 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700267 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700268 .build();
Piere99511d2018-04-19 16:47:06 +0200269 mcastKryo = new KryoNamespace.Builder()
270 .register(KryoNamespaces.API)
271 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
272 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700273 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200274 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700275 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700276 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700277 .build();
Pier7b657162018-03-27 11:29:42 -0700278 mcastUtils = new McastUtils(srManager, coreAppId, log);
Piere99511d2018-04-19 16:47:06 +0200279 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigi35dab3f2018-01-25 16:16:02 +0100280 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700281 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100282 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
283 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700284 }
285
286 /**
Piere99511d2018-04-19 16:47:06 +0200287 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700288 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100289 public void init() {
Pier7b657162018-03-27 11:29:42 -0700290 lastMcastChange = Instant.now();
291 mcastLock();
292 try {
293 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Piere99511d2018-04-19 16:47:06 +0200294 log.debug("Init group {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +0200295 if (!mcastUtils.isLeader(mcastRoute.group())) {
296 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
297 return;
298 }
Pier7b657162018-03-27 11:29:42 -0700299 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Piere99511d2018-04-19 16:47:06 +0200300 // For each source process the mcast tree
301 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
302 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
303 Set<DeviceId> visited = Sets.newHashSet();
304 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700305 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
Piere99511d2018-04-19 16:47:06 +0200306 currentPath, mcastRoute.group(), source);
307 // Get all the sinks and process them
308 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
309 mcastRouteData.sinks());
310 // Filter out all the working sinks, we do not want to move them
311 // TODO we need a better way to distinguish flows coming from different sources
312 sinks = sinks.stream()
313 .filter(sink -> !mcastPaths.containsKey(sink) ||
314 !isSinkForSource(mcastRoute.group(), sink, source))
315 .collect(Collectors.toSet());
316 if (sinks.isEmpty()) {
317 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
318 return;
319 }
320 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
321 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
322 mcastRoute.group(), paths));
323 });
Pier7b657162018-03-27 11:29:42 -0700324 });
325 } finally {
326 mcastUnlock();
327 }
Charles Chanc91c8782016-03-30 17:54:24 -0700328 }
329
330 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100331 * Clean up when deactivating the application.
332 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100333 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200334 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100335 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200336 mcastNextObjStore.destroy();
337 mcastRoleStore.destroy();
338 mcastUtils.terminate();
339 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100340 }
341
342 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100343 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200344 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700345 *
346 * @param event McastEvent with SOURCE_ADDED type
347 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100348 public void processMcastEvent(McastEvent event) {
Charles Chanba59dd62018-05-10 22:19:49 +0000349 log.info("process {}", event);
Pierdb27b8d2018-04-17 16:29:56 +0200350 // If it is a route added, we do not enqueue
351 if (event.type() == ROUTE_ADDED) {
Pierdb27b8d2018-04-17 16:29:56 +0200352 processRouteAddedInternal(event.subject().route().group());
353 } else {
Pierdb27b8d2018-04-17 16:29:56 +0200354 enqueueMcastEvent(event);
355 }
Pier Luigi6786b922018-02-02 16:19:11 +0100356 }
357
358 /**
Piere99511d2018-04-19 16:47:06 +0200359 * Process the SOURCES_ADDED event.
360 *
361 * @param sources the sources connect point
362 * @param mcastIp the group address
363 * @param sinks the sinks connect points
364 */
365 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
366 Map<HostId, Set<ConnectPoint>> sinks) {
367 lastMcastChange = Instant.now();
368 mcastLock();
369 try {
370 log.debug("Processing sources added {} for group {}", sources, mcastIp);
371 if (!mcastUtils.isLeader(mcastIp)) {
372 log.debug("Skip {} due to lack of leadership", mcastIp);
373 return;
374 }
375 sources.forEach(source -> {
376 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
377 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinksToBeAdded);
378 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
379 });
380 } finally {
381 mcastUnlock();
382 }
383 }
384
385 /**
386 * Process the SOURCES_REMOVED event.
387 *
388 * @param sourcesToBeRemoved the source connect points to be removed
389 * @param remainingSources the remainig source connect points
390 * @param mcastIp the group address
391 * @param sinks the sinks connect points
392 */
393 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
394 Set<ConnectPoint> remainingSources,
395 IpAddress mcastIp,
396 Map<HostId, Set<ConnectPoint>> sinks) {
397 lastMcastChange = Instant.now();
398 mcastLock();
399 try {
400 log.debug("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
401 if (!mcastUtils.isLeader(mcastIp)) {
402 log.debug("Skip {} due to lack of leadership", mcastIp);
403 return;
404 }
405 if (remainingSources.isEmpty()) {
406 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
407 return;
408 }
409 // Skip offline devices
410 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
411 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
412 .collect(Collectors.toSet());
413 if (candidateSources.isEmpty()) {
414 log.debug("Skip {} due to empty sources to be removed", mcastIp);
415 return;
416 }
417 Set<Link> remainingLinks = Sets.newHashSet();
418 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
419 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
420 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
421 totalSources.addAll(remainingSources);
422 // Calculate all the links used by the sources
423 totalSources.forEach(source -> {
424 Set<ConnectPoint> currentSinks = sinks.values()
425 .stream().flatMap(Collection::stream)
426 .filter(sink -> isSinkForSource(mcastIp, sink, source))
427 .collect(Collectors.toSet());
428 candidateSinks.put(source, currentSinks);
429 currentSinks.forEach(currentSink -> {
430 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
431 mcastIp, null, source);
432 if (currentPath.isPresent()) {
433 if (!candidateSources.contains(source)) {
434 remainingLinks.addAll(currentPath.get().links());
435 } else {
436 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
437 }
438 }
439 });
440 });
441 // Clean transit links
442 candidateLinks.forEach((source, currentCandidateLinks) -> {
443 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
444 .immutableCopy();
445 if (!linksToBeRemoved.isEmpty()) {
446 currentCandidateLinks.forEach(link -> {
447 DeviceId srcLink = link.src().deviceId();
448 // Remove ports only on links to be removed
449 if (linksToBeRemoved.contains(link)) {
450 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
451 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
452 source : null));
453 }
454 // Remove role on the candidate links
455 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
456 });
457 }
458 });
459 // Clean ingress and egress
460 candidateSources.forEach(source -> {
461 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
462 currentSinks.forEach(currentSink -> {
463 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
464 source : null);
465 // Sinks co-located with the source
466 if (source.deviceId().equals(currentSink.deviceId())) {
467 if (source.port().equals(currentSink.port())) {
468 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
469 mcastIp, currentSink, source);
470 return;
471 }
472 // We need to check against the other sources and if it is
473 // necessary remove the port from the device - no overlap
474 Set<VlanId> otherVlans = remainingSources.stream()
475 // Only sources co-located and having this sink
476 .filter(remainingSource -> remainingSource.deviceId()
477 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
478 .contains(currentSink))
479 .map(remainingSource -> mcastUtils.assignedVlan(
480 remainingSource.deviceId().equals(currentSink.deviceId()) ?
481 remainingSource : null)).collect(Collectors.toSet());
482 if (!otherVlans.contains(assignedVlan)) {
483 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
484 mcastIp, assignedVlan);
485 }
486 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
487 source));
488 return;
489 }
490 Set<VlanId> otherVlans = remainingSources.stream()
491 .filter(remainingSource -> candidateSinks.get(remainingSource)
492 .contains(currentSink))
493 .map(remainingSource -> mcastUtils.assignedVlan(
494 remainingSource.deviceId().equals(currentSink.deviceId()) ?
495 remainingSource : null)).collect(Collectors.toSet());
496 // Sinks on other leaves
497 if (!otherVlans.contains(assignedVlan)) {
498 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
499 mcastIp, assignedVlan);
500 }
501 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
502 source));
503 });
504 });
505 } finally {
506 mcastUnlock();
507 }
508 }
509
510 /**
Pierdb27b8d2018-04-17 16:29:56 +0200511 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100512 *
Pierdb27b8d2018-04-17 16:29:56 +0200513 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100514 */
Pierdb27b8d2018-04-17 16:29:56 +0200515 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100516 lastMcastChange = Instant.now();
517 mcastLock();
518 try {
Pierdb27b8d2018-04-17 16:29:56 +0200519 log.debug("Processing route added for group {}", mcastIp);
520 // Just elect a new leader
521 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100522 } finally {
523 mcastUnlock();
524 }
525 }
526
527 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100528 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200529 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100530 * @param mcastIp multicast group IP address
531 */
Piere99511d2018-04-19 16:47:06 +0200532 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi6786b922018-02-02 16:19:11 +0100533 lastMcastChange = Instant.now();
534 mcastLock();
535 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100536 log.debug("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200537 if (!mcastUtils.isLeader(mcastIp)) {
538 log.debug("Skip {} due to lack of leadership", mcastIp);
539 mcastUtils.withdrawLeader(mcastIp);
540 return;
541 }
Piere99511d2018-04-19 16:47:06 +0200542 sources.forEach(source -> {
543 // Find out the ingress, transit and egress device of the affected group
544 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
545 .stream().findFirst().orElse(null);
546 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
547 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
548 // If there are no egress and transit devices, sinks could be only on the ingress
549 if (!egressDevices.isEmpty()) {
550 egressDevices.forEach(deviceId -> {
551 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
552 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
553 });
554 }
555 if (!transitDevices.isEmpty()) {
556 transitDevices.forEach(deviceId -> {
557 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
558 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
559 });
560 }
561 if (ingressDevice != null) {
562 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
563 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
564 }
565 });
566 // Finally, withdraw the leadership
567 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100568 } finally {
569 mcastUnlock();
570 }
571 }
572
Pier7b657162018-03-27 11:29:42 -0700573 /**
574 * Process sinks to be removed.
575 *
Piere99511d2018-04-19 16:47:06 +0200576 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700577 * @param mcastIp the ip address of the group
578 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200579 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700580 */
Piere99511d2018-04-19 16:47:06 +0200581 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700582 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200583 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700584 lastMcastChange = Instant.now();
585 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700586 try {
Pierdb27b8d2018-04-17 16:29:56 +0200587 if (!mcastUtils.isLeader(mcastIp)) {
588 log.debug("Skip {} due to lack of leadership", mcastIp);
589 return;
590 }
Piere99511d2018-04-19 16:47:06 +0200591 log.debug("Processing sinks removed for group {} and for sources {}",
592 mcastIp, sources);
593 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
594 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
595 sources.forEach(source -> {
596 // Save the path associated to the sinks to be removed
597 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
598 newSinks, source);
599 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
600 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
601 sink.deviceId(), mcastIp,
602 null, source)));
603 treesToBeRemoved.put(source, treeToBeRemoved);
604 // Recover the dual-homed sinks
605 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
606 prevSinks, source);
607 treesToBeAdded.put(source, sinksToBeRecovered);
608 });
609 // Remove the sinks taking into account the multiple sources and the original paths
610 treesToBeRemoved.forEach((source, tree) ->
611 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
612 // Add new sinks according to the recovery procedure
613 treesToBeAdded.forEach((source, sinks) ->
614 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700615 } finally {
616 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700617 }
618 }
619
Pier Luigi6786b922018-02-02 16:19:11 +0100620 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100621 * Removes a path from source to sink for given multicast group.
622 *
623 * @param source connect point of the multicast source
624 * @param sink connection point of the multicast sink
625 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200626 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100627 */
628 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200629 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100630 lastMcastChange = Instant.now();
631 mcastLock();
632 try {
Piere99511d2018-04-19 16:47:06 +0200633 log.debug("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700634 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100635 // When source and sink are on the same device
636 if (source.deviceId().equals(sink.deviceId())) {
637 // Source and sink are on even the same port. There must be something wrong.
638 if (source.port().equals(sink.port())) {
Piere99511d2018-04-19 16:47:06 +0200639 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100640 return;
641 }
Pier7b657162018-03-27 11:29:42 -0700642 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100643 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200644 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100645 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100646 return;
647 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100648 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700649 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100650 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200651 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100652 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100653 // If this is the last sink on the device, also update upstream
Pier Luigi35dab3f2018-01-25 16:16:02 +0100654 if (mcastPath.isPresent()) {
655 List<Link> links = Lists.newArrayList(mcastPath.get().links());
656 Collections.reverse(links);
657 for (Link link : links) {
658 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200659 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
660 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi92e69be2018-03-02 12:53:37 +0100661 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200662 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100663 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100664 }
Charles Chanc91c8782016-03-30 17:54:24 -0700665 }
666 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100667 } finally {
668 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700669 }
670 }
671
Pier7b657162018-03-27 11:29:42 -0700672
673 /**
674 * Process sinks to be added.
675 *
Piere99511d2018-04-19 16:47:06 +0200676 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700677 * @param mcastIp the group IP
678 * @param newSinks the new sinks to be processed
679 * @param allPrevSinks all previous sinks
680 */
Piere99511d2018-04-19 16:47:06 +0200681 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700682 Map<HostId, Set<ConnectPoint>> newSinks,
683 Set<ConnectPoint> allPrevSinks) {
684 lastMcastChange = Instant.now();
685 mcastLock();
686 try {
Pierdb27b8d2018-04-17 16:29:56 +0200687 if (!mcastUtils.isLeader(mcastIp)) {
688 log.debug("Skip {} due to lack of leadership", mcastIp);
689 return;
690 }
Piere99511d2018-04-19 16:47:06 +0200691 log.debug("Processing sinks added for group {} and for sources {}", mcastIp, sources);
692 sources.forEach(source -> {
693 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
694 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
695 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
696 });
Pier7b657162018-03-27 11:29:42 -0700697 } finally {
698 mcastUnlock();
699 }
700 }
701
Charles Chanc91c8782016-03-30 17:54:24 -0700702 /**
703 * Establishes a path from source to sink for given multicast group.
704 *
705 * @param source connect point of the multicast source
706 * @param sink connection point of the multicast sink
707 * @param mcastIp multicast group IP address
708 */
709 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700710 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100711 lastMcastChange = Instant.now();
712 mcastLock();
713 try {
Piere99511d2018-04-19 16:47:06 +0200714 log.debug("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100715 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700716 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
Piere99511d2018-04-19 16:47:06 +0200717 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100718 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100719 if (source.port().equals(sink.port())) {
720 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
721 mcastIp, sink, source);
722 return;
723 }
Pier7b657162018-03-27 11:29:42 -0700724 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere99511d2018-04-19 16:47:06 +0200725 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100726 return;
727 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100728 // Find a path. If present, create/update groups and flows for each hop
Piere99511d2018-04-19 16:47:06 +0200729 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100730 if (mcastPath.isPresent()) {
731 List<Link> links = mcastPath.get().links();
Pier1a7e0c02018-03-12 15:00:54 -0700732 // Setup mcast role for ingress
Piere99511d2018-04-19 16:47:06 +0200733 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
734 // Setup properly the transit forwarding
Pier Luigi35dab3f2018-01-25 16:16:02 +0100735 links.forEach(link -> {
736 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700737 mcastUtils.assignedVlan(link.src().deviceId()
738 .equals(source.deviceId()) ? source : null));
739 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
740 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100741 });
Pier1a7e0c02018-03-12 15:00:54 -0700742 // Setup mcast role for the transit
743 links.stream()
744 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Piere99511d2018-04-19 16:47:06 +0200745 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
746 source), TRANSIT));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100747 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700748 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700749 // Setup mcast role for egress
Piere99511d2018-04-19 16:47:06 +0200750 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100751 } else {
Piere99511d2018-04-19 16:47:06 +0200752 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100753 }
754 } finally {
755 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700756 }
757 }
758
759 /**
Charles Chan72779502016-04-23 17:36:10 -0700760 * Processes the LINK_DOWN event.
761 *
762 * @param affectedLink Link that is going down
763 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100764 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100765 lastMcastChange = Instant.now();
766 mcastLock();
767 try {
768 // Get groups affected by the link down event
769 getAffectedGroups(affectedLink).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200770 log.debug("Processing link down {} for group {}", affectedLink, mcastIp);
771 recoverFailure(mcastIp, affectedLink);
Charles Chan72779502016-04-23 17:36:10 -0700772 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100773 } finally {
774 mcastUnlock();
775 }
Charles Chan72779502016-04-23 17:36:10 -0700776 }
777
778 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100779 * Process the DEVICE_DOWN event.
780 *
781 * @param deviceDown device going down
782 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100783 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100784 lastMcastChange = Instant.now();
785 mcastLock();
786 try {
787 // Get the mcast groups affected by the device going down
788 getAffectedGroups(deviceDown).forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200789 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
790 recoverFailure(mcastIp, deviceDown);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100791 });
792 } finally {
793 mcastUnlock();
794 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100795 }
796
797 /**
Piere99511d2018-04-19 16:47:06 +0200798 * General failure recovery procedure.
799 *
800 * @param mcastIp the group to recover
801 * @param failedElement the failed element
802 */
803 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
804 // TODO Optimize when the group editing is in place
805 if (!mcastUtils.isLeader(mcastIp)) {
806 log.debug("Skip {} due to lack of leadership", mcastIp);
807 return;
808 }
809 // Do not proceed if the sources of this group are missing
810 Set<ConnectPoint> sources = getSources(mcastIp);
811 if (sources.isEmpty()) {
812 log.warn("Missing sources for group {}", mcastIp);
813 return;
814 }
815 // Find out the ingress devices of the affected group
816 // If sinks are in other leafs, we have ingress, transit, egress, and source
817 // If sinks are in the same leaf, we have just ingress and source
818 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
819 if (ingressDevices.isEmpty()) {
820 log.warn("Missing ingress devices for group {}", ingressDevices, mcastIp);
821 return;
822 }
823 // For each tree, delete ingress-transit part
824 sources.forEach(source -> {
825 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
826 transitDevices.forEach(transitDevice -> {
827 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
828 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
829 });
830 });
831 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
832 // TODO Evaluate the possibility of building optimize trees between sources
833 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
834 sources.forEach(source -> {
835 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
836 DeviceId ingressDevice = ingressDevices.stream()
837 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
838 // Clean also the ingress
839 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
840 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
841 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
842 }
843 if (ingressDevice == null) {
844 log.warn("Skip failure recovery - " +
845 "Missing ingress for source {} and group {}", source, mcastIp);
846 return;
847 }
848 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
849 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
850 // We have to verify, if there are egresses without paths
851 mcastTree.forEach((egressDevice, paths) -> {
852 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
853 mcastIp, paths, source);
854 // No paths, we have to try with alternative location
855 if (!mcastPath.isPresent()) {
856 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
857 listSources = listSources == null ? Sets.newHashSet() : listSources;
858 listSources.add(source);
859 return listSources;
860 });
861 notRecoveredInternal.add(egressDevice);
862 }
863 });
864 // Fast path, we can recover all the locations
865 if (notRecoveredInternal.isEmpty()) {
866 mcastTree.forEach((egressDevice, paths) -> {
Charles Chanba59dd62018-05-10 22:19:49 +0000867 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
868 mcastIp, paths, source);
869 if (mcastPath.isPresent()) {
870 installPath(mcastIp, source, mcastPath.get());
871 }
Piere99511d2018-04-19 16:47:06 +0200872 });
873 } else {
874 // Let's try to recover using alternative locations
875 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
876 ingressDevice, source);
877 }
878 });
879 // Finally remove the egresses not recovered
880 notRecovered.forEach((egressDevice, listSources) -> {
881 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
882 if (Objects.equal(currentSources, listSources)) {
883 log.warn("Fail to recover egress device {} from {} failure {}",
884 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
885 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
886 }
887 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
888 });
889 }
890
891 /**
Pier7b657162018-03-27 11:29:42 -0700892 * Try to recover sinks using alternate locations.
893 *
894 * @param egressDevices the original egress devices
895 * @param notRecovered the devices not recovered
896 * @param mcastIp the group address
897 * @param ingressDevice the ingress device
898 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700899 */
900 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200901 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
902 log.debug("Processing recover sinks for group {} and for source {}",
903 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700904 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700905 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700906 Set<ConnectPoint> totalSinks = Sets.newHashSet();
907 // Let's compute all the affected sinks and all the sinks
908 notRecovered.forEach(deviceId -> {
909 totalAffectedSinks.addAll(
Charles Chanba59dd62018-05-10 22:19:49 +0000910 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
911 .flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700912 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Charles Chanba59dd62018-05-10 22:19:49 +0000913 .collect(Collectors.toSet())
914 );
Pier7b657162018-03-27 11:29:42 -0700915 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200916 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Charles Chanba59dd62018-05-10 22:19:49 +0000917 .flatMap(Collection::stream).collect(Collectors.toSet())
918 );
Pier7b657162018-03-27 11:29:42 -0700919 });
Pier7b657162018-03-27 11:29:42 -0700920 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200921 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
922 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
923 newEgressDevices.addAll(recovered);
924 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
925 newEgressDevices = newEgressDevices.stream()
926 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
927 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700928 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200929 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700930 sinksToBeAdded.stream()
931 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
932 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
933 }
Pier7b657162018-03-27 11:29:42 -0700934 // Construct a new path for each egress device
935 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200936 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700937 if (mcastPath.isPresent()) {
938 // Using recovery procedure
939 if (recovered.contains(egressDevice)) {
940 installPath(mcastIp, source, mcastPath.get());
941 } else {
942 // otherwise we need to threat as new sink
943 sinksToBeAdded.stream()
944 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
945 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
946 }
Pier7b657162018-03-27 11:29:42 -0700947 }
948 });
Pier7b657162018-03-27 11:29:42 -0700949 }
950
951 /**
Pier28164682018-04-17 15:50:43 +0200952 * Process all the sinks related to a mcast group and return
953 * the ones to be removed.
954 *
955 * @param mcastIp the group address
956 * @param prevsinks the previous sinks to be evaluated
957 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200958 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200959 * @return the set of the sinks to be removed
960 */
961 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
962 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +0200963 Map<HostId, Set<ConnectPoint>> newSinks,
964 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +0200965 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
966 prevsinks.forEach(((hostId, connectPoints) -> {
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +0000967 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +0000968 //in this case connect points are single homed sinks.
969 //just found the difference btw previous and new sinks for this source.
970 Set<ConnectPoint> difference = Sets.difference(connectPoints, newSinks.get(hostId));
971 sinksToBeProcessed.addAll(difference);
972 return;
973 }
Pier28164682018-04-17 15:50:43 +0200974 // We have to check with the existing flows
975 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200976 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +0200977 .findFirst().orElse(null);
978 if (sinkToBeProcessed != null) {
979 // If the host has been removed or location has been removed
980 if (!newSinks.containsKey(hostId) ||
981 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
982 sinksToBeProcessed.add(sinkToBeProcessed);
983 }
984 }
985 }));
986 // We have done, return the set
987 return sinksToBeProcessed;
988 }
989
990 /**
Pier7b657162018-03-27 11:29:42 -0700991 * Process new locations and return the set of sinks to be added
992 * in the context of the recovery.
993 *
Pier28164682018-04-17 15:50:43 +0200994 * @param newSinks the remaining sinks
995 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +0200996 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700997 * @return the set of the sinks to be processed
998 */
Charles Chanba59dd62018-05-10 22:19:49 +0000999 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
1000 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +02001001 Map<HostId, Set<ConnectPoint>> prevSinks,
1002 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -07001003 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +02001004 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -07001005 // If it has more than 1 locations
1006 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
1007 log.debug("Skip {} since sink {} has {} locations",
1008 mcastIp, hostId, connectPoints.size());
1009 return;
1010 }
Pier28164682018-04-17 15:50:43 +02001011 // If previously it had two locations, we need to recover it
1012 // Filter out if the remaining location is already served
1013 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +02001014 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001015 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +02001016 .findFirst().orElse(null);
1017 if (sinkToBeProcessed != null) {
1018 sinksToBeProcessed.add(sinkToBeProcessed);
1019 }
Pier28164682018-04-17 15:50:43 +02001020 }
Pier7b657162018-03-27 11:29:42 -07001021 });
1022 return sinksToBeProcessed;
1023 }
1024
1025 /**
1026 * Process all the sinks related to a mcast group and return
1027 * the ones to be processed.
1028 *
1029 * @param source the source connect point
1030 * @param mcastIp the group address
1031 * @param sinks the sinks to be evaluated
1032 * @return the set of the sinks to be processed
1033 */
1034 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1035 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001036 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1037 sinks.forEach(((hostId, connectPoints) -> {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001038 //add all connect points that are not tied with any host
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001039 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001040 sinksToBeProcessed.addAll(connectPoints);
1041 return;
1042 }
Pier7b657162018-03-27 11:29:42 -07001043 // If it has more than 2 locations
1044 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1045 log.debug("Skip {} since sink {} has {} locations",
1046 mcastIp, hostId, connectPoints.size());
1047 return;
1048 }
1049 // If it has one location, just use it
1050 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001051 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001052 return;
1053 }
1054 // We prefer to reuse existing flows
1055 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001056 .filter(connectPoint -> {
1057 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1058 return false;
1059 }
1060 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1061 return false;
1062 }
1063 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001064 .filter(remaining -> !remaining.equals(connectPoint))
1065 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001066 // We are already serving the sink
1067 return !isSinkForSource(mcastIp, other, source);
1068 }).findFirst().orElse(null);
1069
Pier7b657162018-03-27 11:29:42 -07001070 if (sinkToBeProcessed != null) {
1071 sinksToBeProcessed.add(sinkToBeProcessed);
1072 return;
1073 }
1074 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001075 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001076 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001077 .filter(connectPoint -> {
1078 if (!egresses.contains(connectPoint.deviceId())) {
1079 return false;
1080 }
1081 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1082 return false;
1083 }
1084 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001085 .filter(remaining -> !remaining.equals(connectPoint))
1086 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001087 return !isSinkForSource(mcastIp, other, source);
1088 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001089 if (sinkToBeProcessed != null) {
1090 sinksToBeProcessed.add(sinkToBeProcessed);
1091 return;
1092 }
1093 // Otherwise we prefer a location co-located with the source (if it exists)
1094 sinkToBeProcessed = connectPoints.stream()
1095 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1096 .findFirst().orElse(null);
1097 if (sinkToBeProcessed != null) {
1098 sinksToBeProcessed.add(sinkToBeProcessed);
1099 return;
1100 }
Piere99511d2018-04-19 16:47:06 +02001101 // Finally, we randomly pick a new location if it is reachable
1102 sinkToBeProcessed = connectPoints.stream()
1103 .filter(connectPoint -> {
1104 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1105 return false;
1106 }
1107 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001108 .filter(remaining -> !remaining.equals(connectPoint))
1109 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001110 return !isSinkForSource(mcastIp, other, source);
1111 }).findFirst().orElse(null);
1112 if (sinkToBeProcessed != null) {
1113 sinksToBeProcessed.add(sinkToBeProcessed);
1114 }
Pier7b657162018-03-27 11:29:42 -07001115 }));
Pier7b657162018-03-27 11:29:42 -07001116 return sinksToBeProcessed;
1117 }
1118
1119 /**
Pier1a7e0c02018-03-12 15:00:54 -07001120 * Utility method to remove all the ingress transit ports.
1121 *
1122 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001123 * @param ingressDevices the ingress devices
1124 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001125 */
Piere99511d2018-04-19 16:47:06 +02001126 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1127 Set<ConnectPoint> sources) {
1128 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1129 sources.forEach(source -> {
1130 DeviceId ingressDevice = ingressDevices.stream()
1131 .filter(deviceId -> deviceId.equals(source.deviceId()))
1132 .findFirst().orElse(null);
1133 if (ingressDevice == null) {
1134 log.warn("Skip removeIngressTransitPorts - " +
1135 "Missing ingress for source {} and group {}",
1136 source, mcastIp);
1137 return;
Pier1a7e0c02018-03-12 15:00:54 -07001138 }
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001139 Set<PortNumber> ingressTransitPort = ingressTransitPort(mcastIp, ingressDevice, source);
1140 if (ingressTransitPort.isEmpty()) {
1141 log.warn("No transit ports to remove on device {}", ingressDevice);
1142 return;
1143 }
1144 ingressTransitPorts.put(source, ingressTransitPort);
Pier1a7e0c02018-03-12 15:00:54 -07001145 });
Piere99511d2018-04-19 16:47:06 +02001146 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1147 DeviceId ingressDevice = ingressDevices.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001148 .filter(deviceId -> deviceId.equals(source.deviceId()))
1149 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001150 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1151 mcastIp, mcastUtils.assignedVlan(source));
1152 if (isLast) {
1153 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1154 }
1155 }));
Pier1a7e0c02018-03-12 15:00:54 -07001156 }
1157
1158 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001159 * Adds a port to given multicast group on given device. This involves the
1160 * update of L3 multicast group and multicast routing table entry.
1161 *
1162 * @param deviceId device ID
1163 * @param port port to be added
1164 * @param mcastIp multicast group
1165 * @param assignedVlan assigned VLAN ID
1166 */
Charles Chanba59dd62018-05-10 22:19:49 +00001167 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1168 IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001169 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001170 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001171 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001172 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001173 // First time someone request this mcast group via this device
1174 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001175 // New nextObj
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001176 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1177 log.debug("Passing 0 as nextId for unconfigured device {}", deviceId);
1178 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1179 portBuilder.build(), 0).add();
1180 } else {
1181 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1182 portBuilder.build(), null).add();
1183 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001184 // Store the new port
1185 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001186 } else {
1187 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001188 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001189 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001190 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001191 if (existingPorts.contains(port)) {
1192 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1193 return;
1194 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001195 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001196 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001197 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001198 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001199 portBuilder.build(), nextObj.id()).addToExisting();
1200 // Store the final next objective and send only the difference to the driver
1201 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1202 // Add just the new port
1203 portBuilder = ImmutableSet.builder();
1204 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001205 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001206 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001207 }
1208 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001209 ObjectiveContext context = new DefaultObjectiveContext(
1210 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1211 mcastIp, deviceId, port.toLong(), assignedVlan),
Charles Chanfacfbef2018-08-23 14:30:33 -07001212 (objective, error) -> {
1213 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1214 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1215 srManager.invalidateNextObj(objective.id());
1216 });
Pier7b657162018-03-27 11:29:42 -07001217 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1218 newNextObj.id()).add(context);
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001219 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1220 log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
1221 } else {
1222 srManager.flowObjectiveService.next(deviceId, newNextObj);
1223 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1224 }
Charles Chanc91c8782016-03-30 17:54:24 -07001225 }
1226
1227 /**
1228 * Removes a port from given multicast group on given device.
1229 * This involves the update of L3 multicast group and multicast routing
1230 * table entry.
1231 *
1232 * @param deviceId device ID
1233 * @param port port to be added
1234 * @param mcastIp multicast group
1235 * @param assignedVlan assigned VLAN ID
1236 * @return true if this is the last sink on this device
1237 */
Charles Chanba59dd62018-05-10 22:19:49 +00001238 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1239 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001240 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001241 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001242 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001243 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001244 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001245 }
Charles Chan72779502016-04-23 17:36:10 -07001246 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001247 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001248 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001249 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001250 if (!existingPorts.isEmpty()) {
1251 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1252 return false;
1253 }
1254 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001255 }
1256 // Copy and modify the ImmutableSet
1257 existingPorts = Sets.newHashSet(existingPorts);
1258 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001259 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001260 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001261 ForwardingObjective fwdObj;
1262 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001263 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001264 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1265 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001266 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001267 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001268 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001269 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001270 } else {
1271 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001272 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001273 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1274 mcastIp, deviceId, port.toLong(), assignedVlan),
Charles Chanfacfbef2018-08-23 14:30:33 -07001275 (objective, error) -> {
1276 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1277 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1278 srManager.invalidateNextObj(objective.id());
1279 });
Pier Luigi8cd46de2018-01-19 10:24:53 +01001280 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001281 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001282 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001283 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001284 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001285 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001286 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001287 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001288 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001289 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1290 log.debug("skip forward and next flow objectives from adding flows on device: {}", deviceId);
1291 } else {
1292 srManager.flowObjectiveService.next(deviceId, newNextObj);
1293 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1294 }
Charles Chanc91c8782016-03-30 17:54:24 -07001295 return existingPorts.isEmpty();
1296 }
1297
Charles Chan72779502016-04-23 17:36:10 -07001298 /**
1299 * Removes entire group on given device.
1300 *
1301 * @param deviceId device ID
1302 * @param mcastIp multicast group to be removed
1303 * @param assignedVlan assigned VLAN ID
1304 */
Charles Chanba59dd62018-05-10 22:19:49 +00001305 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1306 VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001307 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001308 // This device is not serving this multicast group
1309 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1310 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1311 return;
1312 }
1313 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001314 ObjectiveContext context = new DefaultObjectiveContext(
1315 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1316 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001317 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001318 mcastIp, deviceId, assignedVlan, error));
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001319 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1320 log.debug("skip flow changes on unconfigured device: {}", deviceId);
1321 } else {
1322 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
1323 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1324 }
Charles Chan72779502016-04-23 17:36:10 -07001325 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001326 }
1327
Pier Luigi580fd8a2018-01-16 10:47:50 +01001328 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001329 List<Link> links = mcastPath.links();
kezhiyong168fbba2018-12-03 16:14:29 +08001330 if (links.isEmpty()) {
1331 log.warn("There is no link that can be used. Stopping installation.");
1332 return;
1333 }
Pier1a7e0c02018-03-12 15:00:54 -07001334 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001335 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001336 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001337 // For each link, modify the next on the source device adding the src port
1338 // and a new filter objective on the destination port
1339 links.forEach(link -> {
1340 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001341 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1342 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1343 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001344 });
Pier1a7e0c02018-03-12 15:00:54 -07001345 // Setup mcast role for the transit
1346 links.stream()
1347 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001348 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001349 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001350 }
1351
Charles Chanc91c8782016-03-30 17:54:24 -07001352 /**
Pier1f87aca2018-03-14 16:47:32 -07001353 * Go through all the paths, looking for shared links to be used
1354 * in the final path computation.
1355 *
1356 * @param egresses egress devices
1357 * @param availablePaths all the available paths towards the egress
1358 * @return shared links between egress devices
1359 */
Charles Chanba59dd62018-05-10 22:19:49 +00001360 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1361 Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001362 int minLength = Integer.MAX_VALUE;
1363 int length;
Pier1f87aca2018-03-14 16:47:32 -07001364 List<Path> currentPaths;
1365 // Verify the source can still reach all the egresses
1366 for (DeviceId egress : egresses) {
1367 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001368 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001369 currentPaths = availablePaths.get(egress);
1370 if (currentPaths.isEmpty()) {
1371 continue;
1372 }
Piere99511d2018-04-19 16:47:06 +02001373 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001374 length = currentPaths.get(0).links().size();
1375 if (length < minLength) {
1376 minLength = length;
1377 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001378 }
Pier1f87aca2018-03-14 16:47:32 -07001379 // If there are no paths
1380 if (minLength == Integer.MAX_VALUE) {
1381 return Collections.emptySet();
1382 }
Pier1f87aca2018-03-14 16:47:32 -07001383 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001384 Set<Link> sharedLinks = Sets.newHashSet();
1385 Set<Link> currentSharedLinks;
1386 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001387 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001388 // Let's find out the shared links
1389 while (index < minLength) {
1390 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001391 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001392 currentSharedLinks = Sets.newHashSet();
1393 // Iterate over the paths and take the "index" links
1394 for (Path path : currentPaths) {
1395 currentSharedLinks.add(path.links().get(index));
1396 }
1397 // Iterate over the remaining egress
1398 for (DeviceId egress : egresses) {
1399 // Iterate over the paths and take the "index" links
1400 currentLinks = Sets.newHashSet();
1401 for (Path path : availablePaths.get(egress)) {
1402 currentLinks.add(path.links().get(index));
1403 }
1404 // Do intersection
1405 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1406 // If there are no shared paths exit and record the device to remove
1407 // we have to retry with a subset of sinks
1408 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001409 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001410 index = minLength;
1411 break;
1412 }
1413 }
1414 sharedLinks.addAll(currentSharedLinks);
1415 index++;
1416 }
Piere99511d2018-04-19 16:47:06 +02001417 // If the shared links is empty and there are egress let's retry another time with less sinks,
1418 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001419 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1420 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001421 sharedLinks = exploreMcastTree(egresses, availablePaths);
1422 }
1423 return sharedLinks;
1424 }
1425
1426 /**
1427 * Build Mcast tree having as root the given source and as leaves the given egress points.
1428 *
1429 * @param source source of the tree
1430 * @param sinks leaves of the tree
1431 * @return the computed Mcast tree
1432 */
Charles Chanba59dd62018-05-10 22:19:49 +00001433 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
1434 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001435 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001436 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1437 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
Pier1f87aca2018-03-14 16:47:32 -07001438 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001439 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001440 // We need to put back the source if it was originally present
1441 sinks.forEach(sink -> {
1442 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1443 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1444 });
Pier1f87aca2018-03-14 16:47:32 -07001445 return finalTree;
1446 }
1447
1448 /**
1449 * Build Mcast tree having as root the given source and as leaves the given egress.
1450 *
1451 * @param source source of the tree
1452 * @param egresses leaves of the tree
1453 * @return the computed Mcast tree
1454 */
1455 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1456 Set<DeviceId> egresses) {
1457 // Pre-compute all the paths
1458 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001459 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1460 Collections.emptySet())));
1461 // Explore the topology looking for shared links amongst the egresses
1462 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001463 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001464 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001465 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1466 linksToEnforce)));
1467 return availablePaths;
1468 }
1469
1470 /**
1471 * Gets path from src to dst computed using the custom link weigher.
1472 *
1473 * @param src source device ID
1474 * @param dst destination device ID
1475 * @return list of paths from src to dst
1476 */
1477 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001478 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001479 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001480 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
Pier1f87aca2018-03-14 16:47:32 -07001481 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1482 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001483 }
1484
Charles Chanc91c8782016-03-30 17:54:24 -07001485 /**
1486 * Gets a path from src to dst.
1487 * If a path was allocated before, returns the allocated path.
1488 * Otherwise, randomly pick one from available paths.
1489 *
1490 * @param src source device ID
1491 * @param dst destination device ID
1492 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001493 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001494 * @return an optional path from src to dst
1495 */
Piere99511d2018-04-19 16:47:06 +02001496 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1497 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001498 if (allPaths == null) {
1499 allPaths = getPaths(src, dst, Collections.emptySet());
1500 }
Charles Chanc91c8782016-03-30 17:54:24 -07001501 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001502 return Optional.empty();
1503 }
Piere99511d2018-04-19 16:47:06 +02001504 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001505 // a path in the list associated to the index 1 shares only the
1506 // first hop and it is less suitable of a path belonging to the index
1507 // 2 that shares leaf-spine.
1508 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001509 int nhop;
1510 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001511 PortNumber srcPort;
1512 Set<PortNumber> existingPorts;
1513 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001514 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001515 if (!src.equals(path.links().get(0).src().deviceId())) {
1516 continue;
1517 }
1518 nhop = 0;
1519 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001520 for (Link hop : path.links()) {
1521 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1522 source : null);
1523 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1524 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001525 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001526 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001527 }
Pier Luigi91573e12018-01-23 16:06:38 +01001528 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001529 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001530 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001531 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001532 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001533 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001534 }
1535 nhop++;
1536 }
1537 // n_hop defines the index
1538 if (nhop > 0) {
1539 eligiblePaths.compute(nhop, (index, paths) -> {
1540 paths = paths == null ? Lists.newArrayList() : paths;
1541 paths.add(path);
1542 return paths;
1543 });
Charles Chanc91c8782016-03-30 17:54:24 -07001544 }
1545 }
Pier Luigi91573e12018-01-23 16:06:38 +01001546 if (eligiblePaths.isEmpty()) {
1547 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001548 Collections.shuffle(allPaths);
1549 return allPaths.stream().findFirst();
1550 }
Pier Luigi91573e12018-01-23 16:06:38 +01001551 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001552 Integer bestIndex = eligiblePaths.keySet().stream()
1553 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001554 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1555 log.debug("{} eligiblePath(s) found from {} to {}",
1556 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001557 Collections.shuffle(bestPaths);
1558 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001559 }
1560
1561 /**
Piere99511d2018-04-19 16:47:06 +02001562 * Gets device(s) of given role and of given source in given multicast tree.
1563 *
1564 * @param mcastIp multicast IP
1565 * @param role multicast role
1566 * @param source source connect point
1567 * @return set of device ID or empty set if not found
1568 */
1569 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1570 return mcastRoleStore.entrySet().stream()
1571 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001572 entry.getKey().source().equals(source) &&
1573 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001574 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1575 }
1576
1577 /**
Charles Chan72779502016-04-23 17:36:10 -07001578 * Gets device(s) of given role in given multicast group.
1579 *
1580 * @param mcastIp multicast IP
1581 * @param role multicast role
1582 * @return set of device ID or empty set if not found
1583 */
1584 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1585 return mcastRoleStore.entrySet().stream()
1586 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1587 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001588 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1589 }
1590
1591 /**
1592 * Gets source(s) of given role, given device in given multicast group.
1593 *
1594 * @param mcastIp multicast IP
1595 * @param deviceId device id
1596 * @param role multicast role
1597 * @return set of device ID or empty set if not found
1598 */
1599 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1600 return mcastRoleStore.entrySet().stream()
1601 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1602 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1603 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1604 }
1605
1606 /**
1607 * Gets source(s) of given multicast group.
1608 *
1609 * @param mcastIp multicast IP
1610 * @return set of device ID or empty set if not found
1611 */
1612 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1613 return mcastRoleStore.entrySet().stream()
1614 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1615 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001616 }
1617
1618 /**
1619 * Gets groups which is affected by the link down event.
1620 *
1621 * @param link link going down
1622 * @return a set of multicast IpAddress
1623 */
1624 private Set<IpAddress> getAffectedGroups(Link link) {
1625 DeviceId deviceId = link.src().deviceId();
1626 PortNumber port = link.src().port();
1627 return mcastNextObjStore.entrySet().stream()
1628 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001629 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1630 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001631 }
1632
1633 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001634 * Gets groups which are affected by the device down event.
1635 *
1636 * @param deviceId device going down
1637 * @return a set of multicast IpAddress
1638 */
1639 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1640 return mcastNextObjStore.entrySet().stream()
1641 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001642 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001643 .collect(Collectors.toSet());
1644 }
1645
1646 /**
Charles Chan72779502016-04-23 17:36:10 -07001647 * Gets the spine-facing port on ingress device of given multicast group.
1648 *
1649 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001650 * @param ingressDevice the ingress device
1651 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001652 * @return spine-facing port on ingress device
1653 */
Charles Chanba59dd62018-05-10 22:19:49 +00001654 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1655 ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001656 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001657 if (ingressDevice != null) {
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001658 Versioned<NextObjective> nextObjVers = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1659 mcastUtils.assignedVlan(source)));
1660 if (nextObjVers == null) {
1661 log.warn("Absent next objective for {}", new McastStoreKey(mcastIp, ingressDevice,
1662 mcastUtils.assignedVlan(source)));
1663 return portBuilder.build();
1664 }
1665 NextObjective nextObj = nextObjVers.value();
Pier7b657162018-03-27 11:29:42 -07001666 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001667 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001668 for (PortNumber port : ports) {
1669 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001670 if (srManager.deviceConfiguration() != null &&
1671 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan8d316332018-06-19 20:31:57 -07001672 (srManager.xconnectService == null ||
1673 !srManager.xconnectService.hasXconnect(new ConnectPoint(ingressDevice, port)))) {
Pier1a7e0c02018-03-12 15:00:54 -07001674 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001675 }
1676 }
1677 }
Pier1a7e0c02018-03-12 15:00:54 -07001678 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001679 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001680
1681 /**
Pier28164682018-04-17 15:50:43 +02001682 * Verify if a given connect point is sink for this group.
1683 *
1684 * @param mcastIp group address
1685 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001686 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001687 * @return true if the connect point is sink of the group
1688 */
Charles Chanba59dd62018-05-10 22:19:49 +00001689 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1690 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001691 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1692 source : null);
1693 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001694 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1695 return false;
1696 }
Pier28164682018-04-17 15:50:43 +02001697 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1698 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1699 }
1700
1701 /**
Piere99511d2018-04-19 16:47:06 +02001702 * Verify if a given connect point is sink for this group and for this source.
1703 *
1704 * @param mcastIp group address
1705 * @param connectPoint connect point to be verified
1706 * @param source source connect point
1707 * @return true if the connect point is sink of the group
1708 */
Charles Chanba59dd62018-05-10 22:19:49 +00001709 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1710 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001711 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1712 DeviceId device;
1713 if (connectPoint.deviceId().equals(source.deviceId())) {
1714 device = getDevice(mcastIp, INGRESS, source).stream()
1715 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1716 .findFirst().orElse(null);
1717 } else {
1718 device = getDevice(mcastIp, EGRESS, source).stream()
1719 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1720 .findFirst().orElse(null);
1721 }
1722 return isSink && device != null;
1723 }
1724
1725 /**
1726 * Verify if a sink is reachable from this source.
1727 *
1728 * @param mcastIp group address
1729 * @param sink connect point to be verified
1730 * @param source source connect point
1731 * @return true if the connect point is reachable from the source
1732 */
Charles Chanba59dd62018-05-10 22:19:49 +00001733 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1734 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001735 return sink.deviceId().equals(source.deviceId()) ||
1736 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1737 }
1738
1739 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001740 * Updates filtering objective for given device and port.
1741 * It is called in general when the mcast config has been
1742 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001743 *
1744 * @param deviceId device ID
1745 * @param portNum ingress port number
1746 * @param vlanId assigned VLAN ID
1747 * @param install true to add, false to remove
1748 */
Charles Chanba59dd62018-05-10 22:19:49 +00001749 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1750 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001751 lastMcastChange = Instant.now();
1752 mcastLock();
1753 try {
Piere99511d2018-04-19 16:47:06 +02001754 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigi35dab3f2018-01-25 16:16:02 +01001755 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001756 log.debug("Update filter for {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +02001757 if (!mcastUtils.isLeader(mcastRoute.group())) {
1758 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1759 return;
1760 }
Piere99511d2018-04-19 16:47:06 +02001761 // Get the sources and for each one update properly the filtering objectives
1762 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1763 sources.forEach(source -> {
1764 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1765 if (install) {
1766 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
1767 } else {
1768 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1769 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001770 }
Piere99511d2018-04-19 16:47:06 +02001771 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001772 });
1773 } finally {
1774 mcastUnlock();
1775 }
1776 }
1777
1778 /**
1779 * Performs bucket verification operation for all mcast groups in the devices.
1780 * Firstly, it verifies that mcast is stable before trying verification operation.
1781 * Verification consists in creating new nexts with VERIFY operation. Actually,
1782 * the operation is totally delegated to the driver.
1783 */
Piere99511d2018-04-19 16:47:06 +02001784 private final class McastBucketCorrector implements Runnable {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001785
pierc32ef422020-01-27 17:45:03 +01001786 private static final int MAX_VERIFY_ON_FLIGHT = 10;
1787 private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
1788 // Define the context used for the back pressure mechanism
1789 private final ObjectiveContext context = new DefaultObjectiveContext(
1790 (objective) -> {
1791 synchronized (verifyOnFlight) {
1792 verifyOnFlight.decrementAndGet();
1793 verifyOnFlight.notify();
1794 }
1795 },
1796 (objective, error) -> {
1797 synchronized (verifyOnFlight) {
1798 verifyOnFlight.decrementAndGet();
1799 verifyOnFlight.notify();
1800 }
1801 });
1802
Pier Luigi35dab3f2018-01-25 16:16:02 +01001803 @Override
1804 public void run() {
pierc32ef422020-01-27 17:45:03 +01001805 if (!isMcastStable() || wasBktCorrRunning()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001806 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001807 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001808 mcastLock();
1809 try {
1810 // Iterates over the routes and verify the related next objectives
pierc32ef422020-01-27 17:45:03 +01001811 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
1812 IpAddress mcastIp = mcastRoute.group();
1813 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1814 // Verify leadership on the operation
1815 if (!mcastUtils.isLeader(mcastIp)) {
1816 log.trace("Skip {} due to lack of leadership", mcastIp);
1817 continue;
1818 }
1819 // Get sources and sinks from Mcast Route Service and warn about errors
1820 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1821 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1822 .flatMap(Collection::stream).collect(Collectors.toSet());
1823 // Do not proceed if sources of this group are missing
1824 if (sources.isEmpty()) {
1825 if (!sinks.isEmpty()) {
1826 log.warn("Unable to run buckets corrector. " +
1827 "Missing source {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +02001828 }
pierc32ef422020-01-27 17:45:03 +01001829 continue;
1830 }
1831 // For each group we get current information in the store
1832 // and issue a check of the next objectives in place
1833 Set<McastStoreKey> processedKeys = Sets.newHashSet();
1834 for (ConnectPoint source : sources) {
1835 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1836 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1837 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1838 // Do not proceed if ingress devices are missing
1839 if (ingressDevices.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001840 if (!sinks.isEmpty()) {
1841 log.warn("Unable to run buckets corrector. " +
pierc32ef422020-01-27 17:45:03 +01001842 "Missing ingress {} for source {} and for group {}",
1843 ingressDevices, source, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001844 }
pierc32ef422020-01-27 17:45:03 +01001845 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001846 }
pierc32ef422020-01-27 17:45:03 +01001847 // Create the set of the devices to be processed
1848 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1849 if (!ingressDevices.isEmpty()) {
1850 devicesBuilder.addAll(ingressDevices);
1851 }
1852 if (!transitDevices.isEmpty()) {
1853 devicesBuilder.addAll(transitDevices);
1854 }
1855 if (!egressDevices.isEmpty()) {
1856 devicesBuilder.addAll(egressDevices);
1857 }
1858 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1859 for (DeviceId deviceId : devicesToProcess) {
1860 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1861 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
Piere99511d2018-04-19 16:47:06 +02001862 return;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001863 }
pierc32ef422020-01-27 17:45:03 +01001864 synchronized (verifyOnFlight) {
1865 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
1866 verifyOnFlight.wait();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001867 }
pierc32ef422020-01-27 17:45:03 +01001868 }
1869 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1870 source : null);
1871 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1872 // Check if we already processed this next - trees merge at some point
1873 if (processedKeys.contains(currentKey)) {
1874 continue;
1875 }
1876 // Verify the nextobjective or skip to next device
1877 if (mcastNextObjStore.containsKey(currentKey)) {
1878 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1879 // Rebuild the next objective using assigned vlan
1880 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1881 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
1882 // Send to the flowobjective service
1883 srManager.flowObjectiveService.next(deviceId, currentNext);
1884 verifyOnFlight.incrementAndGet();
1885 log.trace("Verify on flight {}", verifyOnFlight);
1886 processedKeys.add(currentKey);
1887 } else {
1888 log.warn("Unable to run buckets corrector. " +
1889 "Missing next for {}, for source {} and for group {}",
1890 deviceId, source, mcastIp);
1891 }
1892 }
1893 }
1894 }
1895 } catch (InterruptedException e) {
1896 log.warn("BktCorr has been interrupted");
Pier Luigi35dab3f2018-01-25 16:16:02 +01001897 } finally {
pierc32ef422020-01-27 17:45:03 +01001898 lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +01001899 mcastUnlock();
1900 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001901 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001902 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001903
Piere99511d2018-04-19 16:47:06 +02001904 /**
1905 * Returns the associated next ids to the mcast groups or to the single
1906 * group if mcastIp is present.
1907 *
1908 * @param mcastIp the group ip
1909 * @return the mapping mcastIp-device to next id
1910 */
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001911 public Map<McastStoreKey, Integer> getNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001912 if (mcastIp != null) {
1913 return mcastNextObjStore.entrySet().stream()
1914 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001915 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001916 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001917 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001918 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001919 }
1920
Pier71c55772018-04-17 17:25:22 +02001921 /**
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001922 * Removes given next ID from mcast next id store.
1923 *
1924 * @param nextId next id
1925 */
1926 public void removeNextId(int nextId) {
1927 mcastNextObjStore.entrySet().forEach(e -> {
1928 if (e.getValue().value().id() == nextId) {
1929 mcastNextObjStore.remove(e.getKey());
1930 }
1931 });
1932 }
1933
1934 /**
Piere99511d2018-04-19 16:47:06 +02001935 * Returns the associated roles to the mcast groups.
1936 *
1937 * @param mcastIp the group ip
1938 * @param sourcecp the source connect point
1939 * @return the mapping mcastIp-device to mcast role
1940 */
Charles Chanba59dd62018-05-10 22:19:49 +00001941 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1942 ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02001943 if (mcastIp != null) {
1944 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1945 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1946 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1947 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1948 if (sourcecp != null) {
1949 roles = roles.entrySet().stream()
1950 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1951 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1952 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1953 }
1954 return roles;
1955 }
1956 return mcastRoleStore.entrySet().stream()
1957 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1958 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1959 }
1960
Pier71c55772018-04-17 17:25:22 +02001961 /**
1962 * Returns the associated trees to the mcast group.
1963 *
1964 * @param mcastIp the group ip
1965 * @param sourcecp the source connect point
1966 * @return the mapping egress point to mcast path
1967 */
Charles Chanba59dd62018-05-10 22:19:49 +00001968 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1969 ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02001970 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02001971 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02001972 if (sourcecp != null) {
1973 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02001974 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02001975 }
Pier71c55772018-04-17 17:25:22 +02001976 if (!sources.isEmpty()) {
1977 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02001978 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1979 Set<DeviceId> visited = Sets.newHashSet();
1980 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001981 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
1982 currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02001983 mcastPaths.forEach(mcastTrees::put);
1984 });
1985 }
1986 return mcastTrees;
1987 }
1988
1989 /**
Pierdb27b8d2018-04-17 16:29:56 +02001990 * Return the leaders of the mcast groups.
1991 *
1992 * @param mcastIp the group ip
1993 * @return the mapping group-node
1994 */
1995 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1996 return mcastUtils.getMcastLeaders(mcastIp);
1997 }
Charles Chanc91c8782016-03-30 17:54:24 -07001998}