blob: 55334132ef6a183e7fc8d204d3b6f83017242e87 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier71c55772018-04-17 17:25:22 +020020import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070021import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070022import com.google.common.collect.ImmutableSet;
23import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010024import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020025import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070027import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070028import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020030import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070033import org.onosproject.mcast.api.McastEvent;
34import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070035import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070036import org.onosproject.mcast.api.McastRouteUpdate;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000037import org.onosproject.net.Device;
Charles Chanba59dd62018-05-10 22:19:49 +000038import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070039import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Link;
42import org.onosproject.net.Path;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000043import org.onosproject.net.Port;
Charles Chanc91c8782016-03-30 17:54:24 -070044import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070045import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070046import org.onosproject.net.flowobjective.ForwardingObjective;
47import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070048import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070049import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010050import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070051import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070052import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010053import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070054import org.onosproject.store.serializers.KryoNamespaces;
55import org.onosproject.store.service.ConsistentMap;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000056import org.onosproject.store.service.DistributedSet;
Charles Chanc91c8782016-03-30 17:54:24 -070057import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020058import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070059import org.slf4j.Logger;
60import org.slf4j.LoggerFactory;
61
Pier Luigi35dab3f2018-01-25 16:16:02 +010062import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070063import java.util.Collection;
64import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010065import java.util.Comparator;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000066import java.util.Iterator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
pierc32ef422020-01-27 17:45:03 +010074import java.util.concurrent.atomic.AtomicInteger;
pier62e0b072019-12-23 19:21:49 +010075import java.util.concurrent.atomic.AtomicReference;
Charles Chan72779502016-04-23 17:36:10 -070076import java.util.stream.Collectors;
77
Pier Luigi35dab3f2018-01-25 16:16:02 +010078import static java.util.concurrent.Executors.newScheduledThreadPool;
79import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000080
Pierdb27b8d2018-04-17 16:29:56 +020081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020083import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
84import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000085import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
86import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
87
Pier979e61a2018-03-07 11:42:50 +010088import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
89import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070091
92/**
Pier Luigi69f774d2018-02-28 12:10:50 +010093 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070094 */
Charles Chan1eaf4802016-04-18 13:44:03 -070095public class McastHandler {
pier62e0b072019-12-23 19:21:49 +010096 // Internal elements
Charles Chan1eaf4802016-04-18 13:44:03 -070097 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070098 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070099 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +0200100 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700101 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +0200102 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000103 private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100104 // Stability threshold for Mcast. Seconds
105 private static final long MCAST_STABLITY_THRESHOLD = 5;
Piere99511d2018-04-19 16:47:06 +0200106 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100107 private static final long MCAST_VERIFY_INTERVAL = 30;
pier62e0b072019-12-23 19:21:49 +0100108 // Max verify that can be processed at the same time
109 private static final int MAX_VERIFY_ON_FLIGHT = 10;
110 // Last change done
111 private AtomicReference<Instant> lastMcastChange = new AtomicReference<>(Instant.now());
112 // Last bucker corrector execution
113 private AtomicReference<Instant> lastBktCorrExecution = new AtomicReference<>(Instant.now());
114 // Executors for mcast bucket corrector and for the events
115 private ScheduledExecutorService mcastCorrector
116 = newScheduledThreadPool(1, groupedThreads("onos", "m-corrector", log));
117 private ScheduledExecutorService mcastWorker
118 = newScheduledThreadPool(1, groupedThreads("onos", "m-worker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100119
Charles Chan72779502016-04-23 17:36:10 -0700120 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700121 * Constructs the McastEventHandler.
122 *
123 * @param srManager Segment Routing manager
124 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700125 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700126 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700127 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700128 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700129 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700130 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200131 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700132 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700133 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700134 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700135 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700136 .build();
Piere99511d2018-04-19 16:47:06 +0200137 mcastKryo = new KryoNamespace.Builder()
138 .register(KryoNamespaces.API)
139 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
140 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700141 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200142 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700143 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700144 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700145 .build();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000146 mcastKryo = new KryoNamespace.Builder()
147 .register(KryoNamespaces.API)
148 .register(new McastFilteringObjStoreSerializer(), McastFilteringObjStoreKey.class);
149 mcastFilteringObjStore = srManager.storageService
150 .<McastFilteringObjStoreKey>setBuilder()
151 .withName("onos-mcast-filtering-store")
152 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-FilteringObj")))
153 .build()
154 .asDistributedSet();
Pier7b657162018-03-27 11:29:42 -0700155 mcastUtils = new McastUtils(srManager, coreAppId, log);
pier62e0b072019-12-23 19:21:49 +0100156 // Init the executor for the buckets corrector
157 mcastCorrector.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
158 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
159 }
160
161 /**
162 * Determines if mcast in the network has been stable in the last
163 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
164 * to the last mcast change timestamp.
165 *
166 * @return true if stable
167 */
168 private boolean isMcastStable() {
169 long last = (long) (lastMcastChange.get().toEpochMilli() / 1000.0);
170 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
171 log.trace("Multicast stable since {}s", now - last);
172 return (now - last) > MCAST_STABLITY_THRESHOLD;
173 }
174
175 /**
176 * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
177 * by comparing the current time with the last corrector execution.
178 *
179 * @return true if stable
180 */
181 private boolean wasBktCorrRunning() {
182 long last = (long) (lastBktCorrExecution.get().toEpochMilli() / 1000.0);
183 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
184 log.trace("McastBucketCorrector executed {}s ago", now - last);
185 return (now - last) < MCAST_VERIFY_INTERVAL;
Charles Chan72779502016-04-23 17:36:10 -0700186 }
187
188 /**
Piere99511d2018-04-19 16:47:06 +0200189 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700190 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100191 public void init() {
pier62e0b072019-12-23 19:21:49 +0100192 mcastWorker.execute(this::initInternal);
193 }
194
195 private void initInternal() {
196 lastMcastChange.set(Instant.now());
197 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
198 log.debug("Init group {}", mcastRoute.group());
199 if (!mcastUtils.isLeader(mcastRoute.group())) {
200 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
201 return;
202 }
203 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
204 // For each source process the mcast tree
205 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
206 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
207 Set<DeviceId> visited = Sets.newHashSet();
208 List<ConnectPoint> currentPath = Lists.newArrayList(source);
209 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
210 currentPath, mcastRoute.group(), source);
211 // Get all the sinks and process them
212 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
213 mcastRouteData.sinks());
214 // Filter out all the working sinks, we do not want to move them
215 // TODO we need a better way to distinguish flows coming from different sources
216 sinks = sinks.stream()
217 .filter(sink -> !mcastPaths.containsKey(sink) ||
218 !isSinkForSource(mcastRoute.group(), sink, source))
219 .collect(Collectors.toSet());
220 if (sinks.isEmpty()) {
221 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
Pierdb27b8d2018-04-17 16:29:56 +0200222 return;
223 }
pier62e0b072019-12-23 19:21:49 +0100224 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
225 source.deviceId(), sinks);
226 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
227 mcastRoute.group(), paths));
Pier7b657162018-03-27 11:29:42 -0700228 });
pier62e0b072019-12-23 19:21:49 +0100229 });
Charles Chanc91c8782016-03-30 17:54:24 -0700230 }
231
232 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100233 * Clean up when deactivating the application.
234 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100235 public void terminate() {
pier62e0b072019-12-23 19:21:49 +0100236 mcastCorrector.shutdown();
237 mcastWorker.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200238 mcastNextObjStore.destroy();
239 mcastRoleStore.destroy();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000240 mcastFilteringObjStore.destroy();
Pier72d0e582018-04-20 14:14:34 +0200241 mcastUtils.terminate();
242 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100243 }
244
245 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100246 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200247 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700248 *
pier62e0b072019-12-23 19:21:49 +0100249 * @param event the multicast event to be processed
Charles Chanc91c8782016-03-30 17:54:24 -0700250 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100251 public void processMcastEvent(McastEvent event) {
pier62e0b072019-12-23 19:21:49 +0100252 mcastWorker.execute(() -> processMcastEventInternal(event));
253 }
254
255 private void processMcastEventInternal(McastEvent event) {
256 lastMcastChange.set(Instant.now());
257 // Current subject is null, for ROUTE_REMOVED events
258 final McastRouteUpdate mcastUpdate = event.subject();
259 final McastRouteUpdate mcastPrevUpdate = event.prevSubject();
260 IpAddress mcastIp = mcastPrevUpdate.route().group();
261 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
262 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
263 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
264 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
265 Set<ConnectPoint> sources;
266 // Events handling
Pierdb27b8d2018-04-17 16:29:56 +0200267 if (event.type() == ROUTE_ADDED) {
pier62e0b072019-12-23 19:21:49 +0100268 processRouteAddedInternal(mcastUpdate.route().group());
269 } else if (event.type() == ROUTE_REMOVED) {
270 processRouteRemovedInternal(prevSources, mcastIp);
271 } else if (event.type() == SOURCES_ADDED) {
272 // Current subject and prev just differ for the source connect points
273 sources = mcastUpdate.sources()
274 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
275 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
276 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
277 } else if (event.type() == SOURCES_REMOVED) {
278 // Current subject and prev just differ for the source connect points
279 sources = mcastUpdate.sources()
280 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
281 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
282 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
283 } else if (event.type() == SINKS_ADDED) {
284 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
285 } else if (event.type() == SINKS_REMOVED) {
286 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pierdb27b8d2018-04-17 16:29:56 +0200287 } else {
pier62e0b072019-12-23 19:21:49 +0100288 log.warn("Event {} not handled", event);
Pierdb27b8d2018-04-17 16:29:56 +0200289 }
Pier Luigi6786b922018-02-02 16:19:11 +0100290 }
291
292 /**
Piere99511d2018-04-19 16:47:06 +0200293 * Process the SOURCES_ADDED event.
294 *
295 * @param sources the sources connect point
296 * @param mcastIp the group address
297 * @param sinks the sinks connect points
298 */
299 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
300 Map<HostId, Set<ConnectPoint>> sinks) {
pier62e0b072019-12-23 19:21:49 +0100301 lastMcastChange.set(Instant.now());
302 log.info("Processing sources added {} for group {}", sources, mcastIp);
303 if (!mcastUtils.isLeader(mcastIp)) {
304 log.debug("Skip {} due to lack of leadership", mcastIp);
305 return;
Piere99511d2018-04-19 16:47:06 +0200306 }
pier62e0b072019-12-23 19:21:49 +0100307 if (sources.isEmpty()) {
308 log.debug("Skip {} due to empty sources to be added", mcastIp);
309 return;
310 }
311 sources.forEach(source -> {
312 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
313 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
314 sinksToBeAdded);
315 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
316 });
Piere99511d2018-04-19 16:47:06 +0200317 }
318
319 /**
320 * Process the SOURCES_REMOVED event.
321 *
322 * @param sourcesToBeRemoved the source connect points to be removed
323 * @param remainingSources the remainig source connect points
324 * @param mcastIp the group address
325 * @param sinks the sinks connect points
326 */
327 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
328 Set<ConnectPoint> remainingSources,
329 IpAddress mcastIp,
330 Map<HostId, Set<ConnectPoint>> sinks) {
pier62e0b072019-12-23 19:21:49 +0100331 lastMcastChange.set(Instant.now());
332 log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
333 if (!mcastUtils.isLeader(mcastIp)) {
334 log.debug("Skip {} due to lack of leadership", mcastIp);
335 return;
336 }
337 if (remainingSources.isEmpty()) {
338 log.debug("There are no more sources for {}", mcastIp);
339 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
340 return;
341 }
342 // Skip offline devices
343 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
344 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
345 .collect(Collectors.toSet());
346 if (candidateSources.isEmpty()) {
347 log.debug("Skip {} due to empty sources to be removed", mcastIp);
348 return;
349 }
350 // Let's heal the trees
351 Set<Link> remainingLinks = Sets.newHashSet();
352 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
353 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
354 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
355 totalSources.addAll(remainingSources);
356 // Calculate all the links used by the sources
357 totalSources.forEach(source -> {
358 Set<ConnectPoint> currentSinks = sinks.values()
359 .stream().flatMap(Collection::stream)
360 .filter(sink -> isSinkForSource(mcastIp, sink, source))
Piere99511d2018-04-19 16:47:06 +0200361 .collect(Collectors.toSet());
pier62e0b072019-12-23 19:21:49 +0100362 candidateSinks.put(source, currentSinks);
363 currentSinks.forEach(currentSink -> {
364 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
365 mcastIp, null, source);
366 if (currentPath.isPresent()) {
367 if (!candidateSources.contains(source)) {
368 remainingLinks.addAll(currentPath.get().links());
369 } else {
370 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
Piere99511d2018-04-19 16:47:06 +0200371 }
Piere99511d2018-04-19 16:47:06 +0200372 }
373 });
pier62e0b072019-12-23 19:21:49 +0100374 });
375 // Clean transit links
376 candidateLinks.forEach((source, currentCandidateLinks) -> {
377 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
378 .immutableCopy();
379 if (!linksToBeRemoved.isEmpty()) {
380 currentCandidateLinks.forEach(link -> {
381 DeviceId srcLink = link.src().deviceId();
382 // Remove ports only on links to be removed
383 if (linksToBeRemoved.contains(link)) {
384 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
385 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
386 source : null));
387 }
388 // Remove role on the candidate links
389 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
390 });
391 }
392 });
393 // Clean ingress and egress
394 candidateSources.forEach(source -> {
395 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
396 currentSinks.forEach(currentSink -> {
397 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
398 source : null);
399 // Sinks co-located with the source
400 if (source.deviceId().equals(currentSink.deviceId())) {
401 if (source.port().equals(currentSink.port())) {
402 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
403 mcastIp, currentSink, source);
Piere99511d2018-04-19 16:47:06 +0200404 return;
405 }
pier62e0b072019-12-23 19:21:49 +0100406 // We need to check against the other sources and if it is
407 // necessary remove the port from the device - no overlap
Piere99511d2018-04-19 16:47:06 +0200408 Set<VlanId> otherVlans = remainingSources.stream()
pier62e0b072019-12-23 19:21:49 +0100409 // Only sources co-located and having this sink
410 .filter(remainingSource -> remainingSource.deviceId()
411 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
Piere99511d2018-04-19 16:47:06 +0200412 .contains(currentSink))
413 .map(remainingSource -> mcastUtils.assignedVlan(
414 remainingSource.deviceId().equals(currentSink.deviceId()) ?
415 remainingSource : null)).collect(Collectors.toSet());
Piere99511d2018-04-19 16:47:06 +0200416 if (!otherVlans.contains(assignedVlan)) {
417 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
418 mcastIp, assignedVlan);
419 }
420 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
421 source));
pier62e0b072019-12-23 19:21:49 +0100422 return;
423 }
424 Set<VlanId> otherVlans = remainingSources.stream()
425 .filter(remainingSource -> candidateSinks.get(remainingSource)
426 .contains(currentSink))
427 .map(remainingSource -> mcastUtils.assignedVlan(
428 remainingSource.deviceId().equals(currentSink.deviceId()) ?
429 remainingSource : null)).collect(Collectors.toSet());
430 // Sinks on other leaves
431 if (!otherVlans.contains(assignedVlan)) {
432 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
433 mcastIp, assignedVlan);
434 }
435 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
436 source));
Piere99511d2018-04-19 16:47:06 +0200437 });
pier62e0b072019-12-23 19:21:49 +0100438 });
Piere99511d2018-04-19 16:47:06 +0200439 }
440
441 /**
Pierdb27b8d2018-04-17 16:29:56 +0200442 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100443 *
Pierdb27b8d2018-04-17 16:29:56 +0200444 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100445 */
Pierdb27b8d2018-04-17 16:29:56 +0200446 private void processRouteAddedInternal(IpAddress mcastIp) {
pier62e0b072019-12-23 19:21:49 +0100447 lastMcastChange.set(Instant.now());
448 log.info("Processing route added for Multicast group {}", mcastIp);
449 // Just elect a new leader
450 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100451 }
452
453 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100454 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200455 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100456 * @param mcastIp multicast group IP address
457 */
Piere99511d2018-04-19 16:47:06 +0200458 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
pier62e0b072019-12-23 19:21:49 +0100459 lastMcastChange.set(Instant.now());
460 log.info("Processing route removed for group {}", mcastIp);
461 if (!mcastUtils.isLeader(mcastIp)) {
462 log.debug("Skip {} due to lack of leadership", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200463 mcastUtils.withdrawLeader(mcastIp);
pier62e0b072019-12-23 19:21:49 +0100464 return;
Pier Luigi6786b922018-02-02 16:19:11 +0100465 }
pier62e0b072019-12-23 19:21:49 +0100466 sources.forEach(source -> {
467 // Find out the ingress, transit and egress device of the affected group
468 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
469 .stream().findFirst().orElse(null);
470 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
471 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
472 // If there are no egress and transit devices, sinks could be only on the ingress
473 if (!egressDevices.isEmpty()) {
474 egressDevices.forEach(deviceId -> {
475 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
476 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
477 });
478 }
479 if (!transitDevices.isEmpty()) {
480 transitDevices.forEach(deviceId -> {
481 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
482 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
483 });
484 }
485 if (ingressDevice != null) {
486 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
487 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
488 }
489 });
490 // Finally, withdraw the leadership
491 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100492 }
493
Pier7b657162018-03-27 11:29:42 -0700494 /**
495 * Process sinks to be removed.
496 *
Piere99511d2018-04-19 16:47:06 +0200497 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700498 * @param mcastIp the ip address of the group
499 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200500 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700501 */
Piere99511d2018-04-19 16:47:06 +0200502 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700503 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200504 Map<HostId, Set<ConnectPoint>> prevSinks) {
pier62e0b072019-12-23 19:21:49 +0100505 lastMcastChange.set(Instant.now());
506 log.info("Processing sinks removed for group {} and for sources {}",
507 mcastIp, sources);
508 if (!mcastUtils.isLeader(mcastIp)) {
509 log.debug("Skip {} due to lack of leadership", mcastIp);
510 return;
Pier7b657162018-03-27 11:29:42 -0700511 }
pier62e0b072019-12-23 19:21:49 +0100512 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
513 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
514 sources.forEach(source -> {
515 // Save the path associated to the sinks to be removed
516 Set<ConnectPoint> candidateSinks = processSinksToBeRemoved(mcastIp, prevSinks,
517 newSinks, source);
518 // Skip offline devices
519 Set<ConnectPoint> sinksToBeRemoved = candidateSinks.stream()
520 .filter(sink -> srManager.deviceService.isAvailable(sink.deviceId()))
521 .collect(Collectors.toSet());
522 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
523 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
524 sink.deviceId(), mcastIp,
525 null, source)));
526 treesToBeRemoved.put(source, treeToBeRemoved);
527 // Recover the dual-homed sinks
528 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
529 prevSinks, source);
530 treesToBeAdded.put(source, sinksToBeRecovered);
531 });
532 // Remove the sinks taking into account the multiple sources and the original paths
533 treesToBeRemoved.forEach((source, tree) ->
534 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
535 // Add new sinks according to the recovery procedure
536 treesToBeAdded.forEach((source, sinks) ->
537 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700538 }
539
Pier Luigi6786b922018-02-02 16:19:11 +0100540 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100541 * Removes a path from source to sink for given multicast group.
542 *
543 * @param source connect point of the multicast source
544 * @param sink connection point of the multicast sink
545 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200546 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100547 */
548 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200549 IpAddress mcastIp, Optional<Path> mcastPath) {
pier62e0b072019-12-23 19:21:49 +0100550 lastMcastChange.set(Instant.now());
551 log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
552 boolean isLast;
553 // When source and sink are on the same device
554 if (source.deviceId().equals(sink.deviceId())) {
555 // Source and sink are on even the same port. There must be something wrong.
556 if (source.port().equals(sink.port())) {
557 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100558 return;
559 }
pier62e0b072019-12-23 19:21:49 +0100560 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100561 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200562 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100563 }
pier62e0b072019-12-23 19:21:49 +0100564 return;
565 }
566 // Process the egress device
567 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
568 if (isLast) {
569 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
570 }
571 // If this is the last sink on the device, also update upstream
572 if (mcastPath.isPresent()) {
573 List<Link> links = Lists.newArrayList(mcastPath.get().links());
574 Collections.reverse(links);
575 for (Link link : links) {
576 if (isLast) {
577 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
578 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100579 if (isLast) {
pier62e0b072019-12-23 19:21:49 +0100580 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100581 }
Charles Chanc91c8782016-03-30 17:54:24 -0700582 }
583 }
pier62e0b072019-12-23 19:21:49 +0100584 } else {
585 log.warn("Unable to find a path from {} to {}. Abort sinkRemoved", source.deviceId(), sink.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700586 }
587 }
588
Pier7b657162018-03-27 11:29:42 -0700589 /**
590 * Process sinks to be added.
591 *
Piere99511d2018-04-19 16:47:06 +0200592 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700593 * @param mcastIp the group IP
594 * @param newSinks the new sinks to be processed
595 * @param allPrevSinks all previous sinks
596 */
Piere99511d2018-04-19 16:47:06 +0200597 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700598 Map<HostId, Set<ConnectPoint>> newSinks,
599 Set<ConnectPoint> allPrevSinks) {
pier62e0b072019-12-23 19:21:49 +0100600 lastMcastChange.set(Instant.now());
601 log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
602 if (!mcastUtils.isLeader(mcastIp)) {
603 log.debug("Skip {} due to lack of leadership", mcastIp);
604 return;
Pier7b657162018-03-27 11:29:42 -0700605 }
pier62e0b072019-12-23 19:21:49 +0100606 sources.forEach(source -> {
607 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
608 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
609 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
610 });
Pier7b657162018-03-27 11:29:42 -0700611 }
612
Charles Chanc91c8782016-03-30 17:54:24 -0700613 /**
614 * Establishes a path from source to sink for given multicast group.
615 *
616 * @param source connect point of the multicast source
617 * @param sink connection point of the multicast sink
618 * @param mcastIp multicast group IP address
619 */
620 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700621 IpAddress mcastIp, List<Path> allPaths) {
pier62e0b072019-12-23 19:21:49 +0100622 lastMcastChange.set(Instant.now());
623 log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
624 // Process the ingress device
625 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
626 mcastUtils.assignedVlan(source), mcastIp.isIp4());
627 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
628 if (source.deviceId().equals(sink.deviceId())) {
629 if (source.port().equals(sink.port())) {
630 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
631 mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100632 return;
633 }
pier62e0b072019-12-23 19:21:49 +0100634 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
635 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
636 return;
637 }
638 // Find a path. If present, create/update groups and flows for each hop
639 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
640 if (mcastPath.isPresent()) {
641 List<Link> links = mcastPath.get().links();
642 // Setup mcast role for ingress
643 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
644 // Setup properly the transit forwarding
645 links.forEach(link -> {
646 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
647 mcastUtils.assignedVlan(link.src().deviceId()
648 .equals(source.deviceId()) ? source : null));
649 McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
650 mcastUtils.assignedVlan(null), mcastIp.isIp4());
651 addFilterToDevice(filteringKey, mcastIp, null);
652 });
653 // Setup mcast role for the transit
654 links.stream()
655 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
656 .forEach(link -> {
657 log.trace("Transit links {}", link);
658 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
659 source), TRANSIT);
660 });
661 // Process the egress device
662 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
663 // Setup mcast role for egress
664 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
665 } else {
666 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700667 }
668 }
669
670 /**
pier62e0b072019-12-23 19:21:49 +0100671 * Processes the PORT_UPDATED event.
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000672 *
673 * @param affectedDevice Affected device
674 * @param affectedPort Affected port
675 */
676 public void processPortUpdate(Device affectedDevice, Port affectedPort) {
pier62e0b072019-12-23 19:21:49 +0100677 mcastWorker.execute(() -> processPortUpdateInternal(affectedDevice, affectedPort));
678 }
679
680 private void processPortUpdateInternal(Device affectedDevice, Port affectedPort) {
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000681 // Clean the filtering obj store. Edge port case.
pier62e0b072019-12-23 19:21:49 +0100682 lastMcastChange.set(Instant.now());
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000683 ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
684 if (!affectedPort.isEnabled()) {
pier62e0b072019-12-23 19:21:49 +0100685 log.info("Processing port down {}", portDown);
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000686 updateFilterObjStoreByPort(portDown);
687 }
688 }
689
690 /**
Charles Chan72779502016-04-23 17:36:10 -0700691 * Processes the LINK_DOWN event.
692 *
piereaddb182020-02-03 13:50:53 +0100693 * @param linkDown Link that is going down
Charles Chan72779502016-04-23 17:36:10 -0700694 */
piereaddb182020-02-03 13:50:53 +0100695 public void processLinkDown(Link linkDown) {
pier62e0b072019-12-23 19:21:49 +0100696 mcastWorker.execute(() -> processLinkDownInternal(linkDown));
697 }
698
699 private void processLinkDownInternal(Link linkDown) {
700 lastMcastChange.set(Instant.now());
701 // Get mcast groups affected by the link going down
702 Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
703 log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
704 affectedGroups.forEach(mcastIp -> {
705 log.debug("Processing link down {} for group {}", linkDown, mcastIp);
706 recoverFailure(mcastIp, linkDown);
707 });
Charles Chan72779502016-04-23 17:36:10 -0700708 }
709
710 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100711 * Process the DEVICE_DOWN event.
712 *
713 * @param deviceDown device going down
714 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100715 public void processDeviceDown(DeviceId deviceDown) {
pier62e0b072019-12-23 19:21:49 +0100716 mcastWorker.execute(() -> processDeviceDownInternal(deviceDown));
717 }
718
719 private void processDeviceDownInternal(DeviceId deviceDown) {
720 lastMcastChange.set(Instant.now());
721 // Get the mcast groups affected by the device going down
722 Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
723 log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
724 updateFilterObjStoreByDevice(deviceDown);
725 affectedGroups.forEach(mcastIp -> {
726 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
727 recoverFailure(mcastIp, deviceDown);
728 });
Pier Luigi580fd8a2018-01-16 10:47:50 +0100729 }
730
731 /**
Piere99511d2018-04-19 16:47:06 +0200732 * General failure recovery procedure.
733 *
734 * @param mcastIp the group to recover
735 * @param failedElement the failed element
736 */
737 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
738 // TODO Optimize when the group editing is in place
739 if (!mcastUtils.isLeader(mcastIp)) {
740 log.debug("Skip {} due to lack of leadership", mcastIp);
741 return;
742 }
743 // Do not proceed if the sources of this group are missing
744 Set<ConnectPoint> sources = getSources(mcastIp);
745 if (sources.isEmpty()) {
746 log.warn("Missing sources for group {}", mcastIp);
747 return;
748 }
749 // Find out the ingress devices of the affected group
750 // If sinks are in other leafs, we have ingress, transit, egress, and source
751 // If sinks are in the same leaf, we have just ingress and source
752 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
753 if (ingressDevices.isEmpty()) {
piereaddb182020-02-03 13:50:53 +0100754 log.warn("Missing ingress devices for group {}", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200755 return;
756 }
757 // For each tree, delete ingress-transit part
758 sources.forEach(source -> {
759 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
760 transitDevices.forEach(transitDevice -> {
761 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
762 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
763 });
764 });
765 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
766 // TODO Evaluate the possibility of building optimize trees between sources
767 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
768 sources.forEach(source -> {
769 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
770 DeviceId ingressDevice = ingressDevices.stream()
771 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
772 // Clean also the ingress
773 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
774 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
775 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
776 }
777 if (ingressDevice == null) {
778 log.warn("Skip failure recovery - " +
779 "Missing ingress for source {} and group {}", source, mcastIp);
780 return;
781 }
782 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
piereaddb182020-02-03 13:50:53 +0100783 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, egressDevices);
Piere99511d2018-04-19 16:47:06 +0200784 // We have to verify, if there are egresses without paths
785 mcastTree.forEach((egressDevice, paths) -> {
786 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
787 mcastIp, paths, source);
788 // No paths, we have to try with alternative location
789 if (!mcastPath.isPresent()) {
790 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
791 listSources = listSources == null ? Sets.newHashSet() : listSources;
792 listSources.add(source);
793 return listSources;
794 });
795 notRecoveredInternal.add(egressDevice);
796 }
797 });
798 // Fast path, we can recover all the locations
799 if (notRecoveredInternal.isEmpty()) {
800 mcastTree.forEach((egressDevice, paths) -> {
Charles Chanba59dd62018-05-10 22:19:49 +0000801 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
802 mcastIp, paths, source);
803 if (mcastPath.isPresent()) {
804 installPath(mcastIp, source, mcastPath.get());
805 }
Piere99511d2018-04-19 16:47:06 +0200806 });
807 } else {
808 // Let's try to recover using alternative locations
809 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
810 ingressDevice, source);
811 }
812 });
813 // Finally remove the egresses not recovered
814 notRecovered.forEach((egressDevice, listSources) -> {
815 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
816 if (Objects.equal(currentSources, listSources)) {
817 log.warn("Fail to recover egress device {} from {} failure {}",
818 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
819 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
820 }
821 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
822 });
823 }
824
825 /**
Pier7b657162018-03-27 11:29:42 -0700826 * Try to recover sinks using alternate locations.
827 *
828 * @param egressDevices the original egress devices
829 * @param notRecovered the devices not recovered
830 * @param mcastIp the group address
831 * @param ingressDevice the ingress device
832 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700833 */
834 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200835 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
836 log.debug("Processing recover sinks for group {} and for source {}",
837 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700838 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700839 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700840 Set<ConnectPoint> totalSinks = Sets.newHashSet();
841 // Let's compute all the affected sinks and all the sinks
842 notRecovered.forEach(deviceId -> {
843 totalAffectedSinks.addAll(
Charles Chanba59dd62018-05-10 22:19:49 +0000844 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
845 .flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700846 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Charles Chanba59dd62018-05-10 22:19:49 +0000847 .collect(Collectors.toSet())
848 );
Pier7b657162018-03-27 11:29:42 -0700849 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200850 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Charles Chanba59dd62018-05-10 22:19:49 +0000851 .flatMap(Collection::stream).collect(Collectors.toSet())
852 );
Pier7b657162018-03-27 11:29:42 -0700853 });
Pier7b657162018-03-27 11:29:42 -0700854 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200855 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
856 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
857 newEgressDevices.addAll(recovered);
858 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
859 newEgressDevices = newEgressDevices.stream()
860 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +0100861 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700862 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200863 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700864 sinksToBeAdded.stream()
865 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
866 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
867 }
Pier7b657162018-03-27 11:29:42 -0700868 // Construct a new path for each egress device
869 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200870 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700871 if (mcastPath.isPresent()) {
872 // Using recovery procedure
873 if (recovered.contains(egressDevice)) {
874 installPath(mcastIp, source, mcastPath.get());
875 } else {
876 // otherwise we need to threat as new sink
877 sinksToBeAdded.stream()
878 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
879 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
880 }
Pier7b657162018-03-27 11:29:42 -0700881 }
882 });
Pier7b657162018-03-27 11:29:42 -0700883 }
884
885 /**
Pier28164682018-04-17 15:50:43 +0200886 * Process all the sinks related to a mcast group and return
887 * the ones to be removed.
888 *
889 * @param mcastIp the group address
890 * @param prevsinks the previous sinks to be evaluated
891 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200892 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200893 * @return the set of the sinks to be removed
894 */
895 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
896 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +0200897 Map<HostId, Set<ConnectPoint>> newSinks,
898 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +0200899 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +0100900 log.debug("Processing sinks to be removed for Multicast group {}, source {}",
901 mcastIp, source);
Pier28164682018-04-17 15:50:43 +0200902 prevsinks.forEach(((hostId, connectPoints) -> {
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +0000903 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +0000904 //in this case connect points are single homed sinks.
905 //just found the difference btw previous and new sinks for this source.
906 Set<ConnectPoint> difference = Sets.difference(connectPoints, newSinks.get(hostId));
907 sinksToBeProcessed.addAll(difference);
908 return;
909 }
Pier28164682018-04-17 15:50:43 +0200910 // We have to check with the existing flows
911 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200912 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +0200913 .findFirst().orElse(null);
914 if (sinkToBeProcessed != null) {
915 // If the host has been removed or location has been removed
916 if (!newSinks.containsKey(hostId) ||
917 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
918 sinksToBeProcessed.add(sinkToBeProcessed);
919 }
920 }
921 }));
922 // We have done, return the set
923 return sinksToBeProcessed;
924 }
925
926 /**
Pier7b657162018-03-27 11:29:42 -0700927 * Process new locations and return the set of sinks to be added
928 * in the context of the recovery.
929 *
Pier28164682018-04-17 15:50:43 +0200930 * @param newSinks the remaining sinks
931 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +0200932 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700933 * @return the set of the sinks to be processed
934 */
Charles Chanba59dd62018-05-10 22:19:49 +0000935 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
936 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +0200937 Map<HostId, Set<ConnectPoint>> prevSinks,
938 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -0700939 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +0100940 log.debug("Processing sinks to be recovered for Multicast group {}, source {}",
941 mcastIp, source);
Pier28164682018-04-17 15:50:43 +0200942 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700943 // If it has more than 1 locations
944 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
945 log.debug("Skip {} since sink {} has {} locations",
946 mcastIp, hostId, connectPoints.size());
947 return;
948 }
Pier28164682018-04-17 15:50:43 +0200949 // If previously it had two locations, we need to recover it
950 // Filter out if the remaining location is already served
951 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +0200952 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200953 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +0200954 .findFirst().orElse(null);
955 if (sinkToBeProcessed != null) {
956 sinksToBeProcessed.add(sinkToBeProcessed);
957 }
Pier28164682018-04-17 15:50:43 +0200958 }
Pier7b657162018-03-27 11:29:42 -0700959 });
960 return sinksToBeProcessed;
961 }
962
963 /**
964 * Process all the sinks related to a mcast group and return
965 * the ones to be processed.
966 *
967 * @param source the source connect point
968 * @param mcastIp the group address
969 * @param sinks the sinks to be evaluated
970 * @return the set of the sinks to be processed
971 */
972 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
973 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -0700974 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +0100975 log.debug("Processing sinks to be added for Multicast group {}, source {}",
976 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700977 sinks.forEach(((hostId, connectPoints) -> {
Esin Karamanf1f46e32019-03-05 13:49:02 +0000978 //add all connect points that are not tied with any host
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +0000979 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +0000980 sinksToBeProcessed.addAll(connectPoints);
981 return;
982 }
Pier7b657162018-03-27 11:29:42 -0700983 // If it has more than 2 locations
984 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
985 log.debug("Skip {} since sink {} has {} locations",
986 mcastIp, hostId, connectPoints.size());
987 return;
988 }
989 // If it has one location, just use it
990 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +0200991 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -0700992 return;
993 }
994 // We prefer to reuse existing flows
995 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200996 .filter(connectPoint -> {
997 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
998 return false;
999 }
1000 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1001 return false;
1002 }
1003 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001004 .filter(remaining -> !remaining.equals(connectPoint))
1005 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001006 // We are already serving the sink
1007 return !isSinkForSource(mcastIp, other, source);
1008 }).findFirst().orElse(null);
1009
Pier7b657162018-03-27 11:29:42 -07001010 if (sinkToBeProcessed != null) {
1011 sinksToBeProcessed.add(sinkToBeProcessed);
1012 return;
1013 }
1014 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001015 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001016 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001017 .filter(connectPoint -> {
1018 if (!egresses.contains(connectPoint.deviceId())) {
1019 return false;
1020 }
1021 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1022 return false;
1023 }
1024 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001025 .filter(remaining -> !remaining.equals(connectPoint))
1026 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001027 return !isSinkForSource(mcastIp, other, source);
1028 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001029 if (sinkToBeProcessed != null) {
1030 sinksToBeProcessed.add(sinkToBeProcessed);
1031 return;
1032 }
1033 // Otherwise we prefer a location co-located with the source (if it exists)
1034 sinkToBeProcessed = connectPoints.stream()
1035 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1036 .findFirst().orElse(null);
1037 if (sinkToBeProcessed != null) {
1038 sinksToBeProcessed.add(sinkToBeProcessed);
1039 return;
1040 }
Piere99511d2018-04-19 16:47:06 +02001041 // Finally, we randomly pick a new location if it is reachable
1042 sinkToBeProcessed = connectPoints.stream()
1043 .filter(connectPoint -> {
1044 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1045 return false;
1046 }
1047 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001048 .filter(remaining -> !remaining.equals(connectPoint))
1049 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001050 return !isSinkForSource(mcastIp, other, source);
1051 }).findFirst().orElse(null);
1052 if (sinkToBeProcessed != null) {
1053 sinksToBeProcessed.add(sinkToBeProcessed);
1054 }
Pier7b657162018-03-27 11:29:42 -07001055 }));
Pier7b657162018-03-27 11:29:42 -07001056 return sinksToBeProcessed;
1057 }
1058
1059 /**
Pier1a7e0c02018-03-12 15:00:54 -07001060 * Utility method to remove all the ingress transit ports.
1061 *
1062 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001063 * @param ingressDevices the ingress devices
1064 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001065 */
Piere99511d2018-04-19 16:47:06 +02001066 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1067 Set<ConnectPoint> sources) {
1068 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1069 sources.forEach(source -> {
1070 DeviceId ingressDevice = ingressDevices.stream()
1071 .filter(deviceId -> deviceId.equals(source.deviceId()))
1072 .findFirst().orElse(null);
1073 if (ingressDevice == null) {
1074 log.warn("Skip removeIngressTransitPorts - " +
1075 "Missing ingress for source {} and group {}",
1076 source, mcastIp);
1077 return;
Pier1a7e0c02018-03-12 15:00:54 -07001078 }
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001079 Set<PortNumber> ingressTransitPort = ingressTransitPort(mcastIp, ingressDevice, source);
1080 if (ingressTransitPort.isEmpty()) {
1081 log.warn("No transit ports to remove on device {}", ingressDevice);
1082 return;
1083 }
1084 ingressTransitPorts.put(source, ingressTransitPort);
Pier1a7e0c02018-03-12 15:00:54 -07001085 });
Piere99511d2018-04-19 16:47:06 +02001086 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1087 DeviceId ingressDevice = ingressDevices.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001088 .filter(deviceId -> deviceId.equals(source.deviceId()))
1089 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001090 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1091 mcastIp, mcastUtils.assignedVlan(source));
1092 if (isLast) {
1093 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1094 }
1095 }));
Pier1a7e0c02018-03-12 15:00:54 -07001096 }
1097
1098 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001099 * Adds a port to given multicast group on given device. This involves the
1100 * update of L3 multicast group and multicast routing table entry.
1101 *
1102 * @param deviceId device ID
1103 * @param port port to be added
1104 * @param mcastIp multicast group
1105 * @param assignedVlan assigned VLAN ID
1106 */
Charles Chanba59dd62018-05-10 22:19:49 +00001107 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1108 IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001109 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001110 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001111 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001112 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001113 // First time someone request this mcast group via this device
1114 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001115 // New nextObj
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001116 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1117 log.debug("Passing 0 as nextId for unconfigured device {}", deviceId);
1118 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1119 portBuilder.build(), 0).add();
1120 } else {
1121 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1122 portBuilder.build(), null).add();
1123 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001124 // Store the new port
1125 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001126 // Create, store and apply the new nextObj and fwdObj
1127 ObjectiveContext context = new DefaultObjectiveContext(
1128 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1129 mcastIp, deviceId, port.toLong(), assignedVlan),
1130 (objective, error) -> {
1131 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1132 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1133 srManager.invalidateNextObj(objective.id());
1134 });
1135 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1136 newNextObj.id()).add(context);
1137 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1138 log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
1139 } else {
1140 srManager.flowObjectiveService.next(deviceId, newNextObj);
1141 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1142 }
Charles Chanc91c8782016-03-30 17:54:24 -07001143 } else {
1144 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001145 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001146 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001147 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001148 if (existingPorts.contains(port)) {
piereaddb182020-02-03 13:50:53 +01001149 log.debug("Port {}/{} already exists for {}. Abort", deviceId, port, mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -07001150 return;
1151 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001152 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001153 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001154 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001155 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001156 portBuilder.build(), nextObj.id()).addToExisting();
1157 // Store the final next objective and send only the difference to the driver
1158 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1159 // Add just the new port
1160 portBuilder = ImmutableSet.builder();
1161 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001162 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001163 portBuilder.build(), nextObj.id()).addToExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001164 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1165 log.debug("skip next flowobjective update for device: {}", deviceId);
1166 } else {
1167 // no need to update the flow here since we have updated the nextobjective/group
1168 // the existing flow will keep pointing to the updated nextobj
1169 srManager.flowObjectiveService.next(deviceId, newNextObj);
1170 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001171 }
Charles Chanc91c8782016-03-30 17:54:24 -07001172 }
1173
1174 /**
1175 * Removes a port from given multicast group on given device.
1176 * This involves the update of L3 multicast group and multicast routing
1177 * table entry.
1178 *
1179 * @param deviceId device ID
1180 * @param port port to be added
1181 * @param mcastIp multicast group
1182 * @param assignedVlan assigned VLAN ID
1183 * @return true if this is the last sink on this device
1184 */
Charles Chanba59dd62018-05-10 22:19:49 +00001185 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1186 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001187 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001188 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001189 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001190 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001191 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001192 }
Charles Chan72779502016-04-23 17:36:10 -07001193 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001194 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001195 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001196 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001197 if (!existingPorts.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001198 log.debug("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
Piere99511d2018-04-19 16:47:06 +02001199 return false;
1200 }
1201 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001202 }
1203 // Copy and modify the ImmutableSet
1204 existingPorts = Sets.newHashSet(existingPorts);
1205 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001206 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001207 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001208 ForwardingObjective fwdObj;
1209 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001210 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001211 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1212 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001213 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001214 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001215 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001216 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1217 log.debug("skip forward flowobjective removal for device: {}", deviceId);
1218 } else {
1219 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1220 }
Charles Chan72779502016-04-23 17:36:10 -07001221 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001222 } else {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001223 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001224 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001225 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan72779502016-04-23 17:36:10 -07001226 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001227 // Let's modify the next objective removing the bucket
1228 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001229 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001230 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1231 log.debug("skip next flowobjective update for device: {}", deviceId);
1232 } else {
1233 // no need to update the flow here since we have updated the next objective + group
1234 // the existing flow will keep pointing to the updated nextobj
1235 srManager.flowObjectiveService.next(deviceId, newNextObj);
1236 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001237 }
Charles Chanc91c8782016-03-30 17:54:24 -07001238 return existingPorts.isEmpty();
1239 }
1240
Charles Chan72779502016-04-23 17:36:10 -07001241 /**
1242 * Removes entire group on given device.
1243 *
1244 * @param deviceId device ID
1245 * @param mcastIp multicast group to be removed
1246 * @param assignedVlan assigned VLAN ID
1247 */
Charles Chanba59dd62018-05-10 22:19:49 +00001248 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1249 VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001250 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001251 // This device is not serving this multicast group
1252 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
piereaddb182020-02-03 13:50:53 +01001253 log.debug("{} is not serving {}. Abort.", deviceId, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -07001254 return;
1255 }
1256 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001257 ObjectiveContext context = new DefaultObjectiveContext(
1258 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1259 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001260 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001261 mcastIp, deviceId, assignedVlan, error));
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001262 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1263 log.debug("skip flow changes on unconfigured device: {}", deviceId);
1264 } else {
1265 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
1266 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1267 }
Charles Chan72779502016-04-23 17:36:10 -07001268 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001269 }
1270
Pier Luigi580fd8a2018-01-16 10:47:50 +01001271 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001272 List<Link> links = mcastPath.links();
kezhiyong168fbba2018-12-03 16:14:29 +08001273 if (links.isEmpty()) {
1274 log.warn("There is no link that can be used. Stopping installation.");
1275 return;
1276 }
Pier1a7e0c02018-03-12 15:00:54 -07001277 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001278 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001279 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001280 // For each link, modify the next on the source device adding the src port
1281 // and a new filter objective on the destination port
1282 links.forEach(link -> {
1283 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001284 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001285 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(link.dst(),
1286 mcastUtils.assignedVlan(null), mcastIp.isIp4());
1287 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001288 });
Pier1a7e0c02018-03-12 15:00:54 -07001289 // Setup mcast role for the transit
1290 links.stream()
1291 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001292 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001293 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001294 }
1295
Charles Chanc91c8782016-03-30 17:54:24 -07001296 /**
Pier1f87aca2018-03-14 16:47:32 -07001297 * Go through all the paths, looking for shared links to be used
1298 * in the final path computation.
1299 *
1300 * @param egresses egress devices
1301 * @param availablePaths all the available paths towards the egress
1302 * @return shared links between egress devices
1303 */
Charles Chanba59dd62018-05-10 22:19:49 +00001304 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1305 Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001306 int minLength = Integer.MAX_VALUE;
1307 int length;
Pier1f87aca2018-03-14 16:47:32 -07001308 List<Path> currentPaths;
1309 // Verify the source can still reach all the egresses
1310 for (DeviceId egress : egresses) {
1311 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001312 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001313 currentPaths = availablePaths.get(egress);
1314 if (currentPaths.isEmpty()) {
1315 continue;
1316 }
Piere99511d2018-04-19 16:47:06 +02001317 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001318 length = currentPaths.get(0).links().size();
1319 if (length < minLength) {
1320 minLength = length;
1321 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001322 }
Pier1f87aca2018-03-14 16:47:32 -07001323 // If there are no paths
1324 if (minLength == Integer.MAX_VALUE) {
1325 return Collections.emptySet();
1326 }
Pier1f87aca2018-03-14 16:47:32 -07001327 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001328 Set<Link> sharedLinks = Sets.newHashSet();
1329 Set<Link> currentSharedLinks;
1330 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001331 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001332 // Let's find out the shared links
1333 while (index < minLength) {
1334 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001335 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001336 currentSharedLinks = Sets.newHashSet();
1337 // Iterate over the paths and take the "index" links
1338 for (Path path : currentPaths) {
1339 currentSharedLinks.add(path.links().get(index));
1340 }
1341 // Iterate over the remaining egress
1342 for (DeviceId egress : egresses) {
1343 // Iterate over the paths and take the "index" links
1344 currentLinks = Sets.newHashSet();
1345 for (Path path : availablePaths.get(egress)) {
1346 currentLinks.add(path.links().get(index));
1347 }
1348 // Do intersection
1349 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1350 // If there are no shared paths exit and record the device to remove
1351 // we have to retry with a subset of sinks
1352 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001353 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001354 index = minLength;
1355 break;
1356 }
1357 }
1358 sharedLinks.addAll(currentSharedLinks);
1359 index++;
1360 }
Piere99511d2018-04-19 16:47:06 +02001361 // If the shared links is empty and there are egress let's retry another time with less sinks,
1362 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001363 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1364 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001365 sharedLinks = exploreMcastTree(egresses, availablePaths);
1366 }
1367 return sharedLinks;
1368 }
1369
1370 /**
1371 * Build Mcast tree having as root the given source and as leaves the given egress points.
1372 *
piereaddb182020-02-03 13:50:53 +01001373 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001374 * @param source source of the tree
1375 * @param sinks leaves of the tree
1376 * @return the computed Mcast tree
1377 */
piereaddb182020-02-03 13:50:53 +01001378 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(IpAddress mcastIp,
1379 DeviceId source,
Charles Chanba59dd62018-05-10 22:19:49 +00001380 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001381 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001382 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1383 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +01001384 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001385 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001386 // We need to put back the source if it was originally present
1387 sinks.forEach(sink -> {
1388 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1389 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1390 });
Pier1f87aca2018-03-14 16:47:32 -07001391 return finalTree;
1392 }
1393
1394 /**
1395 * Build Mcast tree having as root the given source and as leaves the given egress.
1396 *
piereaddb182020-02-03 13:50:53 +01001397 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001398 * @param source source of the tree
1399 * @param egresses leaves of the tree
1400 * @return the computed Mcast tree
1401 */
piereaddb182020-02-03 13:50:53 +01001402 private Map<DeviceId, List<Path>> computeMcastTree(IpAddress mcastIp,
1403 DeviceId source,
Pier1f87aca2018-03-14 16:47:32 -07001404 Set<DeviceId> egresses) {
piereaddb182020-02-03 13:50:53 +01001405 log.debug("Computing tree for Multicast group {}, source {} and leafs {}",
1406 mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001407 // Pre-compute all the paths
1408 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001409 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1410 Collections.emptySet())));
1411 // Explore the topology looking for shared links amongst the egresses
1412 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001413 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001414 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001415 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1416 linksToEnforce)));
1417 return availablePaths;
1418 }
1419
1420 /**
1421 * Gets path from src to dst computed using the custom link weigher.
1422 *
1423 * @param src source device ID
1424 * @param dst destination device ID
1425 * @return list of paths from src to dst
1426 */
1427 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001428 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001429 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001430 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
piereaddb182020-02-03 13:50:53 +01001431 log.trace("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Pier1f87aca2018-03-14 16:47:32 -07001432 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001433 }
1434
Charles Chanc91c8782016-03-30 17:54:24 -07001435 /**
1436 * Gets a path from src to dst.
1437 * If a path was allocated before, returns the allocated path.
1438 * Otherwise, randomly pick one from available paths.
1439 *
1440 * @param src source device ID
1441 * @param dst destination device ID
1442 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001443 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001444 * @return an optional path from src to dst
1445 */
Piere99511d2018-04-19 16:47:06 +02001446 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1447 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001448 if (allPaths == null) {
1449 allPaths = getPaths(src, dst, Collections.emptySet());
1450 }
Charles Chanc91c8782016-03-30 17:54:24 -07001451 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001452 return Optional.empty();
1453 }
Piere99511d2018-04-19 16:47:06 +02001454 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001455 // a path in the list associated to the index 1 shares only the
1456 // first hop and it is less suitable of a path belonging to the index
1457 // 2 that shares leaf-spine.
1458 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001459 int nhop;
1460 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001461 PortNumber srcPort;
1462 Set<PortNumber> existingPorts;
1463 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001464 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001465 if (!src.equals(path.links().get(0).src().deviceId())) {
1466 continue;
1467 }
1468 nhop = 0;
1469 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001470 for (Link hop : path.links()) {
1471 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1472 source : null);
1473 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1474 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001475 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001476 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001477 }
Pier Luigi91573e12018-01-23 16:06:38 +01001478 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001479 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001480 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001481 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001482 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001483 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001484 }
1485 nhop++;
1486 }
1487 // n_hop defines the index
1488 if (nhop > 0) {
1489 eligiblePaths.compute(nhop, (index, paths) -> {
1490 paths = paths == null ? Lists.newArrayList() : paths;
1491 paths.add(path);
1492 return paths;
1493 });
Charles Chanc91c8782016-03-30 17:54:24 -07001494 }
1495 }
Pier Luigi91573e12018-01-23 16:06:38 +01001496 if (eligiblePaths.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001497 log.trace("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001498 Collections.shuffle(allPaths);
1499 return allPaths.stream().findFirst();
1500 }
Pier Luigi91573e12018-01-23 16:06:38 +01001501 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001502 Integer bestIndex = eligiblePaths.keySet().stream()
1503 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001504 List<Path> bestPaths = eligiblePaths.get(bestIndex);
piereaddb182020-02-03 13:50:53 +01001505 log.trace("{} eligiblePath(s) found from {} to {}",
Pier Luigi91573e12018-01-23 16:06:38 +01001506 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001507 Collections.shuffle(bestPaths);
1508 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001509 }
1510
1511 /**
Piere99511d2018-04-19 16:47:06 +02001512 * Gets device(s) of given role and of given source in given multicast tree.
1513 *
1514 * @param mcastIp multicast IP
1515 * @param role multicast role
1516 * @param source source connect point
1517 * @return set of device ID or empty set if not found
1518 */
1519 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1520 return mcastRoleStore.entrySet().stream()
1521 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001522 entry.getKey().source().equals(source) &&
1523 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001524 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1525 }
1526
1527 /**
Charles Chan72779502016-04-23 17:36:10 -07001528 * Gets device(s) of given role in given multicast group.
1529 *
1530 * @param mcastIp multicast IP
1531 * @param role multicast role
1532 * @return set of device ID or empty set if not found
1533 */
1534 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1535 return mcastRoleStore.entrySet().stream()
1536 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1537 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001538 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1539 }
1540
1541 /**
1542 * Gets source(s) of given role, given device in given multicast group.
1543 *
1544 * @param mcastIp multicast IP
1545 * @param deviceId device id
1546 * @param role multicast role
1547 * @return set of device ID or empty set if not found
1548 */
1549 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1550 return mcastRoleStore.entrySet().stream()
1551 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1552 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1553 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1554 }
1555
1556 /**
1557 * Gets source(s) of given multicast group.
1558 *
1559 * @param mcastIp multicast IP
1560 * @return set of device ID or empty set if not found
1561 */
1562 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1563 return mcastRoleStore.entrySet().stream()
1564 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1565 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001566 }
1567
1568 /**
1569 * Gets groups which is affected by the link down event.
1570 *
1571 * @param link link going down
1572 * @return a set of multicast IpAddress
1573 */
1574 private Set<IpAddress> getAffectedGroups(Link link) {
1575 DeviceId deviceId = link.src().deviceId();
1576 PortNumber port = link.src().port();
1577 return mcastNextObjStore.entrySet().stream()
1578 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001579 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1580 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001581 }
1582
1583 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001584 * Gets groups which are affected by the device down event.
1585 *
1586 * @param deviceId device going down
1587 * @return a set of multicast IpAddress
1588 */
1589 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1590 return mcastNextObjStore.entrySet().stream()
1591 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001592 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001593 .collect(Collectors.toSet());
1594 }
1595
1596 /**
Charles Chan72779502016-04-23 17:36:10 -07001597 * Gets the spine-facing port on ingress device of given multicast group.
1598 *
1599 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001600 * @param ingressDevice the ingress device
1601 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001602 * @return spine-facing port on ingress device
1603 */
Charles Chanba59dd62018-05-10 22:19:49 +00001604 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1605 ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001606 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001607 if (ingressDevice != null) {
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001608 Versioned<NextObjective> nextObjVers = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1609 mcastUtils.assignedVlan(source)));
1610 if (nextObjVers == null) {
1611 log.warn("Absent next objective for {}", new McastStoreKey(mcastIp, ingressDevice,
1612 mcastUtils.assignedVlan(source)));
1613 return portBuilder.build();
1614 }
1615 NextObjective nextObj = nextObjVers.value();
Pier7b657162018-03-27 11:29:42 -07001616 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001617 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001618 for (PortNumber port : ports) {
1619 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001620 if (srManager.deviceConfiguration() != null &&
1621 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan8d316332018-06-19 20:31:57 -07001622 (srManager.xconnectService == null ||
1623 !srManager.xconnectService.hasXconnect(new ConnectPoint(ingressDevice, port)))) {
Pier1a7e0c02018-03-12 15:00:54 -07001624 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001625 }
1626 }
1627 }
Pier1a7e0c02018-03-12 15:00:54 -07001628 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001629 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001630
1631 /**
Pier28164682018-04-17 15:50:43 +02001632 * Verify if a given connect point is sink for this group.
1633 *
1634 * @param mcastIp group address
1635 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001636 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001637 * @return true if the connect point is sink of the group
1638 */
Charles Chanba59dd62018-05-10 22:19:49 +00001639 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1640 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001641 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1642 source : null);
1643 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001644 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1645 return false;
1646 }
Pier28164682018-04-17 15:50:43 +02001647 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1648 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1649 }
1650
1651 /**
Piere99511d2018-04-19 16:47:06 +02001652 * Verify if a given connect point is sink for this group and for this source.
1653 *
1654 * @param mcastIp group address
1655 * @param connectPoint connect point to be verified
1656 * @param source source connect point
1657 * @return true if the connect point is sink of the group
1658 */
Charles Chanba59dd62018-05-10 22:19:49 +00001659 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1660 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001661 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1662 DeviceId device;
1663 if (connectPoint.deviceId().equals(source.deviceId())) {
1664 device = getDevice(mcastIp, INGRESS, source).stream()
1665 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1666 .findFirst().orElse(null);
1667 } else {
1668 device = getDevice(mcastIp, EGRESS, source).stream()
1669 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1670 .findFirst().orElse(null);
1671 }
1672 return isSink && device != null;
1673 }
1674
1675 /**
1676 * Verify if a sink is reachable from this source.
1677 *
1678 * @param mcastIp group address
1679 * @param sink connect point to be verified
1680 * @param source source connect point
1681 * @return true if the connect point is reachable from the source
1682 */
Charles Chanba59dd62018-05-10 22:19:49 +00001683 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1684 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001685 return sink.deviceId().equals(source.deviceId()) ||
1686 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1687 }
1688
1689 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001690 * Updates filtering objective for given device and port.
1691 * It is called in general when the mcast config has been
1692 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001693 *
1694 * @param deviceId device ID
1695 * @param portNum ingress port number
1696 * @param vlanId assigned VLAN ID
1697 * @param install true to add, false to remove
1698 */
Charles Chanba59dd62018-05-10 22:19:49 +00001699 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1700 VlanId vlanId, boolean install) {
pier62e0b072019-12-23 19:21:49 +01001701 mcastWorker.execute(() -> updateFilterToDeviceInternal(deviceId, portNum, vlanId, install));
1702 }
1703
1704 private void updateFilterToDeviceInternal(DeviceId deviceId, PortNumber portNum,
1705 VlanId vlanId, boolean install) {
1706 lastMcastChange.set(Instant.now());
1707 // Iterates over the route and updates properly the filtering objective on the source device.
1708 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1709 log.debug("Update filter for {}", mcastRoute.group());
1710 if (!mcastUtils.isLeader(mcastRoute.group())) {
1711 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1712 return;
1713 }
1714 // Get the sources and for each one update properly the filtering objectives
1715 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1716 sources.forEach(source -> {
1717 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1718 if (install) {
1719 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
1720 vlanId, mcastRoute.group().isIp4());
1721 addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
1722 } else {
1723 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001724 }
pier62e0b072019-12-23 19:21:49 +01001725 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001726 });
pier62e0b072019-12-23 19:21:49 +01001727 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001728 }
1729
1730 /**
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001731 * Add filtering to the device if needed.
1732 *
1733 * @param filterObjStoreKey the filtering obj key
1734 * @param mcastIp the multicast group
1735 * @param mcastRole the multicast role
1736 */
1737 private void addFilterToDevice(McastFilteringObjStoreKey filterObjStoreKey,
1738 IpAddress mcastIp,
1739 McastRole mcastRole) {
1740 if (!containsFilterInTheDevice(filterObjStoreKey)) {
1741 // if this is the first sink for this group/device
1742 // match additionally on mac
1743 log.debug("Filtering not available for device {}, vlan {} and {}",
1744 filterObjStoreKey.ingressCP().deviceId(), filterObjStoreKey.vlanId(),
1745 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1746 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1747 filterObjStoreKey.ingressCP().port(),
1748 filterObjStoreKey.vlanId(), mcastIp,
1749 mcastRole, true);
1750 mcastFilteringObjStore.add(filterObjStoreKey);
1751 } else if (!mcastFilteringObjStore.contains(filterObjStoreKey)) {
1752 // match only vlan
1753 log.debug("Filtering not available for connect point {}, vlan {} and {}",
1754 filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
1755 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1756 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1757 filterObjStoreKey.ingressCP().port(),
1758 filterObjStoreKey.vlanId(), mcastIp,
1759 mcastRole, false);
1760 mcastFilteringObjStore.add(filterObjStoreKey);
1761 } else {
1762 // do nothing
1763 log.debug("Filtering already present. Abort");
1764 }
1765 }
1766
1767 /**
1768 * Verify if there are related filtering obj in the device.
1769 *
1770 * @param filteringKey the filtering obj key
1771 * @return true if related filtering obj are found
1772 */
1773 private boolean containsFilterInTheDevice(McastFilteringObjStoreKey filteringKey) {
1774 // check if filters are already added on the device
1775 McastFilteringObjStoreKey key = mcastFilteringObjStore.stream()
1776 .filter(mcastFilteringKey ->
1777 mcastFilteringKey.ingressCP().deviceId().equals(filteringKey.ingressCP().deviceId())
1778 && mcastFilteringKey.isIpv4() == filteringKey.isIpv4()
1779 && mcastFilteringKey.vlanId().equals(filteringKey.vlanId())
1780 ).findFirst().orElse(null);
1781 // we are interested to filt obj on the same device, same vlan and same ip type
1782 return key != null;
1783 }
1784
1785 /**
1786 * Update the filtering objective store upon device failure.
1787 *
1788 * @param affectedDevice the affected device
1789 */
1790 private void updateFilterObjStoreByDevice(DeviceId affectedDevice) {
1791 // purge the related filter objective key
1792 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1793 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1794 McastFilteringObjStoreKey filterKey;
1795 while (filterIterator.hasNext()) {
1796 filterKey = filterIterator.next();
1797 if (filterKey.ingressCP().deviceId().equals(affectedDevice)) {
1798 mcastFilteringObjStore.remove(filterKey);
1799 }
1800 }
1801 }
1802
1803 /**
1804 * Update the filtering objective store upon port failure.
1805 *
1806 * @param affectedPort the affected port
1807 */
1808 private void updateFilterObjStoreByPort(ConnectPoint affectedPort) {
1809 // purge the related filter objective key
1810 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1811 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1812 McastFilteringObjStoreKey filterKey;
1813 while (filterIterator.hasNext()) {
1814 filterKey = filterIterator.next();
1815 if (filterKey.ingressCP().equals(affectedPort)) {
1816 mcastFilteringObjStore.remove(filterKey);
1817 }
1818 }
1819 }
1820
1821 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001822 * Performs bucket verification operation for all mcast groups in the devices.
1823 * Firstly, it verifies that mcast is stable before trying verification operation.
1824 * Verification consists in creating new nexts with VERIFY operation. Actually,
1825 * the operation is totally delegated to the driver.
1826 */
Piere99511d2018-04-19 16:47:06 +02001827 private final class McastBucketCorrector implements Runnable {
pierc32ef422020-01-27 17:45:03 +01001828 private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
1829 // Define the context used for the back pressure mechanism
1830 private final ObjectiveContext context = new DefaultObjectiveContext(
1831 (objective) -> {
1832 synchronized (verifyOnFlight) {
pier62e0b072019-12-23 19:21:49 +01001833 log.trace("Verify {} done", objective.id());
1834 verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
pierc32ef422020-01-27 17:45:03 +01001835 verifyOnFlight.notify();
1836 }
1837 },
1838 (objective, error) -> {
1839 synchronized (verifyOnFlight) {
pier62e0b072019-12-23 19:21:49 +01001840 log.trace("Verify {} error {}", objective.id(), error);
1841 verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
pierc32ef422020-01-27 17:45:03 +01001842 verifyOnFlight.notify();
1843 }
1844 });
1845
Pier Luigi35dab3f2018-01-25 16:16:02 +01001846 @Override
1847 public void run() {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001848 try {
1849 // Iterates over the routes and verify the related next objectives
pierc32ef422020-01-27 17:45:03 +01001850 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
pier62e0b072019-12-23 19:21:49 +01001851 if (!isMcastStable() || wasBktCorrRunning()) {
1852 return;
1853 }
pierc32ef422020-01-27 17:45:03 +01001854 IpAddress mcastIp = mcastRoute.group();
1855 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1856 // Verify leadership on the operation
1857 if (!mcastUtils.isLeader(mcastIp)) {
1858 log.trace("Skip {} due to lack of leadership", mcastIp);
1859 continue;
1860 }
1861 // Get sources and sinks from Mcast Route Service and warn about errors
1862 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1863 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1864 .flatMap(Collection::stream).collect(Collectors.toSet());
1865 // Do not proceed if sources of this group are missing
1866 if (sources.isEmpty()) {
1867 if (!sinks.isEmpty()) {
1868 log.warn("Unable to run buckets corrector. " +
1869 "Missing source {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +02001870 }
pierc32ef422020-01-27 17:45:03 +01001871 continue;
1872 }
1873 // For each group we get current information in the store
1874 // and issue a check of the next objectives in place
1875 Set<McastStoreKey> processedKeys = Sets.newHashSet();
1876 for (ConnectPoint source : sources) {
1877 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1878 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1879 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1880 // Do not proceed if ingress devices are missing
1881 if (ingressDevices.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001882 if (!sinks.isEmpty()) {
1883 log.warn("Unable to run buckets corrector. " +
pierc32ef422020-01-27 17:45:03 +01001884 "Missing ingress {} for source {} and for group {}",
1885 ingressDevices, source, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001886 }
pierc32ef422020-01-27 17:45:03 +01001887 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001888 }
pierc32ef422020-01-27 17:45:03 +01001889 // Create the set of the devices to be processed
1890 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
pier62e0b072019-12-23 19:21:49 +01001891 devicesBuilder.addAll(ingressDevices);
pierc32ef422020-01-27 17:45:03 +01001892 if (!transitDevices.isEmpty()) {
1893 devicesBuilder.addAll(transitDevices);
1894 }
1895 if (!egressDevices.isEmpty()) {
1896 devicesBuilder.addAll(egressDevices);
1897 }
1898 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1899 for (DeviceId deviceId : devicesToProcess) {
1900 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1901 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001902 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001903 }
pierc32ef422020-01-27 17:45:03 +01001904 synchronized (verifyOnFlight) {
1905 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
1906 verifyOnFlight.wait();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001907 }
pierc32ef422020-01-27 17:45:03 +01001908 }
1909 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1910 source : null);
1911 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1912 // Check if we already processed this next - trees merge at some point
1913 if (processedKeys.contains(currentKey)) {
1914 continue;
1915 }
1916 // Verify the nextobjective or skip to next device
1917 if (mcastNextObjStore.containsKey(currentKey)) {
1918 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1919 // Rebuild the next objective using assigned vlan
1920 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1921 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
1922 // Send to the flowobjective service
1923 srManager.flowObjectiveService.next(deviceId, currentNext);
1924 verifyOnFlight.incrementAndGet();
1925 log.trace("Verify on flight {}", verifyOnFlight);
1926 processedKeys.add(currentKey);
1927 } else {
1928 log.warn("Unable to run buckets corrector. " +
1929 "Missing next for {}, for source {} and for group {}",
1930 deviceId, source, mcastIp);
1931 }
1932 }
1933 }
pier62e0b072019-12-23 19:21:49 +01001934 // Let's wait the group before start the next one
1935 synchronized (verifyOnFlight) {
1936 while (verifyOnFlight.get() > 0) {
1937 verifyOnFlight.wait();
1938 }
1939 }
pierc32ef422020-01-27 17:45:03 +01001940 }
1941 } catch (InterruptedException e) {
1942 log.warn("BktCorr has been interrupted");
Pier Luigi35dab3f2018-01-25 16:16:02 +01001943 } finally {
pier62e0b072019-12-23 19:21:49 +01001944 lastBktCorrExecution.set(Instant.now());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001945 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001946 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001947 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001948
Piere99511d2018-04-19 16:47:06 +02001949 /**
1950 * Returns the associated next ids to the mcast groups or to the single
1951 * group if mcastIp is present.
1952 *
1953 * @param mcastIp the group ip
1954 * @return the mapping mcastIp-device to next id
1955 */
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001956 public Map<McastStoreKey, Integer> getNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001957 if (mcastIp != null) {
1958 return mcastNextObjStore.entrySet().stream()
1959 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001960 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001961 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001962 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001963 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001964 }
1965
Pier71c55772018-04-17 17:25:22 +02001966 /**
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001967 * Removes given next ID from mcast next id store.
1968 *
1969 * @param nextId next id
1970 */
1971 public void removeNextId(int nextId) {
1972 mcastNextObjStore.entrySet().forEach(e -> {
1973 if (e.getValue().value().id() == nextId) {
1974 mcastNextObjStore.remove(e.getKey());
1975 }
1976 });
1977 }
1978
1979 /**
Piere99511d2018-04-19 16:47:06 +02001980 * Returns the associated roles to the mcast groups.
1981 *
1982 * @param mcastIp the group ip
1983 * @param sourcecp the source connect point
1984 * @return the mapping mcastIp-device to mcast role
1985 */
Charles Chanba59dd62018-05-10 22:19:49 +00001986 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1987 ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02001988 if (mcastIp != null) {
1989 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1990 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1991 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1992 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1993 if (sourcecp != null) {
1994 roles = roles.entrySet().stream()
1995 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1996 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1997 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1998 }
1999 return roles;
2000 }
2001 return mcastRoleStore.entrySet().stream()
2002 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
2003 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
2004 }
2005
Pier71c55772018-04-17 17:25:22 +02002006 /**
2007 * Returns the associated trees to the mcast group.
2008 *
2009 * @param mcastIp the group ip
2010 * @param sourcecp the source connect point
2011 * @return the mapping egress point to mcast path
2012 */
Charles Chanba59dd62018-05-10 22:19:49 +00002013 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
2014 ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02002015 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02002016 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02002017 if (sourcecp != null) {
2018 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02002019 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02002020 }
Pier71c55772018-04-17 17:25:22 +02002021 if (!sources.isEmpty()) {
2022 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02002023 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
2024 Set<DeviceId> visited = Sets.newHashSet();
2025 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -07002026 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
2027 currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02002028 mcastPaths.forEach(mcastTrees::put);
2029 });
2030 }
2031 return mcastTrees;
2032 }
2033
2034 /**
Pierdb27b8d2018-04-17 16:29:56 +02002035 * Return the leaders of the mcast groups.
2036 *
2037 * @param mcastIp the group ip
2038 * @return the mapping group-node
2039 */
2040 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
2041 return mcastUtils.getMcastLeaders(mcastIp);
2042 }
Harshada Chaundkar9204f312019-07-02 16:01:24 +00002043
2044 /**
2045 * Returns the mcast filtering obj.
2046 *
2047 * @return the mapping group-node
2048 */
2049 public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
2050 Map<DeviceId, List<McastFilteringObjStoreKey>> mapping = Maps.newHashMap();
2051 Set<McastFilteringObjStoreKey> currentKeys = Sets.newHashSet(mcastFilteringObjStore);
2052 currentKeys.forEach(filteringObjStoreKey ->
2053 mapping.compute(filteringObjStoreKey.ingressCP().deviceId(), (k, v) -> {
2054 List<McastFilteringObjStoreKey> values = v;
2055 if (values == null) {
2056 values = Lists.newArrayList();
2057 }
2058 values.add(filteringObjStoreKey);
2059 return values;
2060 })
2061 );
2062 return mapping;
2063 }
Charles Chanc91c8782016-03-30 17:54:24 -07002064}