blob: 8c4968ab060cb07d6b54d3d6a9aeb839afed7017 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier71c55772018-04-17 17:25:22 +020020import com.google.common.collect.HashMultimap;
Charles Chanc91c8782016-03-30 17:54:24 -070021import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010023import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070025import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070026import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070027import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020029import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070032import org.onosproject.mcast.api.McastEvent;
33import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070034import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070035import org.onosproject.mcast.api.McastRouteUpdate;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000036import org.onosproject.net.Device;
Charles Chanba59dd62018-05-10 22:19:49 +000037import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070038import org.onosproject.net.ConnectPoint;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.Link;
41import org.onosproject.net.Path;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000042import org.onosproject.net.Port;
Charles Chanc91c8782016-03-30 17:54:24 -070043import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070044import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070045import org.onosproject.net.flowobjective.ForwardingObjective;
46import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.ObjectiveContext;
Pier Luigi69f774d2018-02-28 12:10:50 +010048import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070049import org.onosproject.store.serializers.KryoNamespaces;
50import org.onosproject.store.service.ConsistentMap;
pier9e02ab72020-02-12 20:40:55 +010051import org.onosproject.store.service.ConsistentMultimap;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000052import org.onosproject.store.service.DistributedSet;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020054import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070055import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
Pier Luigi35dab3f2018-01-25 16:16:02 +010058import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070059import java.util.Collection;
60import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010061import java.util.Comparator;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000062import java.util.Iterator;
Charles Chanc91c8782016-03-30 17:54:24 -070063import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070064import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070065import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070066import java.util.Optional;
67import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010068import java.util.concurrent.ScheduledExecutorService;
69import java.util.concurrent.TimeUnit;
pierc32ef422020-01-27 17:45:03 +010070import java.util.concurrent.atomic.AtomicInteger;
pier62e0b072019-12-23 19:21:49 +010071import java.util.concurrent.atomic.AtomicReference;
Charles Chan72779502016-04-23 17:36:10 -070072import java.util.stream.Collectors;
73
Pier Luigi35dab3f2018-01-25 16:16:02 +010074import static java.util.concurrent.Executors.newScheduledThreadPool;
75import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000076
Pierdb27b8d2018-04-17 16:29:56 +020077import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070078import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020079import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
80import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000081import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
82import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
83
Pier979e61a2018-03-07 11:42:50 +010084import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
85import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
86import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070087
88/**
Pier Luigi69f774d2018-02-28 12:10:50 +010089 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070090 */
Charles Chan1eaf4802016-04-18 13:44:03 -070091public class McastHandler {
pier62e0b072019-12-23 19:21:49 +010092 // Internal elements
Charles Chan1eaf4802016-04-18 13:44:03 -070093 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070094 private final SegmentRoutingManager srManager;
Pierdb27b8d2018-04-17 16:29:56 +020095 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -070096 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +020097 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
pier9e02ab72020-02-12 20:40:55 +010098 private final ConsistentMultimap<McastPathStoreKey, List<Link>> mcastPathStore;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000099 private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100100 // Stability threshold for Mcast. Seconds
101 private static final long MCAST_STABLITY_THRESHOLD = 5;
Piere99511d2018-04-19 16:47:06 +0200102 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100103 private static final long MCAST_VERIFY_INTERVAL = 30;
pier62e0b072019-12-23 19:21:49 +0100104 // Max verify that can be processed at the same time
105 private static final int MAX_VERIFY_ON_FLIGHT = 10;
106 // Last change done
107 private AtomicReference<Instant> lastMcastChange = new AtomicReference<>(Instant.now());
108 // Last bucker corrector execution
109 private AtomicReference<Instant> lastBktCorrExecution = new AtomicReference<>(Instant.now());
110 // Executors for mcast bucket corrector and for the events
111 private ScheduledExecutorService mcastCorrector
112 = newScheduledThreadPool(1, groupedThreads("onos", "m-corrector", log));
113 private ScheduledExecutorService mcastWorker
114 = newScheduledThreadPool(1, groupedThreads("onos", "m-worker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100115
Charles Chan72779502016-04-23 17:36:10 -0700116 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700117 * Constructs the McastEventHandler.
118 *
119 * @param srManager Segment Routing manager
120 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700121 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700122 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700123 this.srManager = srManager;
Pier7b657162018-03-27 11:29:42 -0700124 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700125 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200126 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700127 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700128 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700129 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700130 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700131 .build();
Piere99511d2018-04-19 16:47:06 +0200132 mcastKryo = new KryoNamespace.Builder()
133 .register(KryoNamespaces.API)
134 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
135 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700136 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200137 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700138 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700139 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700140 .build();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000141 mcastKryo = new KryoNamespace.Builder()
142 .register(KryoNamespaces.API)
143 .register(new McastFilteringObjStoreSerializer(), McastFilteringObjStoreKey.class);
144 mcastFilteringObjStore = srManager.storageService
145 .<McastFilteringObjStoreKey>setBuilder()
146 .withName("onos-mcast-filtering-store")
147 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-FilteringObj")))
148 .build()
149 .asDistributedSet();
pier9e02ab72020-02-12 20:40:55 +0100150 mcastKryo = new KryoNamespace.Builder()
151 .register(KryoNamespaces.API)
152 .register(new McastPathStoreKeySerializer(), McastPathStoreKey.class);
153 mcastPathStore = srManager.storageService
154 .<McastPathStoreKey, List<Link>>consistentMultimapBuilder()
155 .withName("onos-mcast-path-store")
156 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Path")))
157 .build();
Pier7b657162018-03-27 11:29:42 -0700158 mcastUtils = new McastUtils(srManager, coreAppId, log);
pier62e0b072019-12-23 19:21:49 +0100159 // Init the executor for the buckets corrector
160 mcastCorrector.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
161 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
162 }
163
164 /**
165 * Determines if mcast in the network has been stable in the last
166 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
167 * to the last mcast change timestamp.
168 *
169 * @return true if stable
170 */
171 private boolean isMcastStable() {
172 long last = (long) (lastMcastChange.get().toEpochMilli() / 1000.0);
173 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
174 log.trace("Multicast stable since {}s", now - last);
175 return (now - last) > MCAST_STABLITY_THRESHOLD;
176 }
177
178 /**
179 * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
180 * by comparing the current time with the last corrector execution.
181 *
182 * @return true if stable
183 */
184 private boolean wasBktCorrRunning() {
185 long last = (long) (lastBktCorrExecution.get().toEpochMilli() / 1000.0);
186 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
187 log.trace("McastBucketCorrector executed {}s ago", now - last);
188 return (now - last) < MCAST_VERIFY_INTERVAL;
Charles Chan72779502016-04-23 17:36:10 -0700189 }
190
191 /**
Piere99511d2018-04-19 16:47:06 +0200192 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700193 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100194 public void init() {
pier62e0b072019-12-23 19:21:49 +0100195 mcastWorker.execute(this::initInternal);
196 }
197
198 private void initInternal() {
pier62e0b072019-12-23 19:21:49 +0100199 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
pier9e02ab72020-02-12 20:40:55 +0100200 lastMcastChange.set(Instant.now());
pier62e0b072019-12-23 19:21:49 +0100201 log.debug("Init group {}", mcastRoute.group());
202 if (!mcastUtils.isLeader(mcastRoute.group())) {
203 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
204 return;
205 }
206 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
207 // For each source process the mcast tree
208 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
pier9e02ab72020-02-12 20:40:55 +0100209 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastRoute.group(), source);
210 Collection<? extends List<Link>> storedPaths = Versioned.valueOrElse(
211 mcastPathStore.get(pathStoreKey), Lists.newArrayList());
212 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = buildMcastPaths(storedPaths, mcastRoute.group(),
213 source);
pier62e0b072019-12-23 19:21:49 +0100214 // Get all the sinks and process them
215 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
216 mcastRouteData.sinks());
217 // Filter out all the working sinks, we do not want to move them
218 // TODO we need a better way to distinguish flows coming from different sources
219 sinks = sinks.stream()
220 .filter(sink -> !mcastPaths.containsKey(sink) ||
221 !isSinkForSource(mcastRoute.group(), sink, source))
222 .collect(Collectors.toSet());
223 if (sinks.isEmpty()) {
224 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
Pierdb27b8d2018-04-17 16:29:56 +0200225 return;
226 }
pier9e02ab72020-02-12 20:40:55 +0100227 Map<ConnectPoint, List<Path>> mcasTree = mcastUtils.computeSinkMcastTree(mcastRoute.group(),
228 source.deviceId(), sinks);
229 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastRoute.group(),
230 null));
Pier7b657162018-03-27 11:29:42 -0700231 });
pier62e0b072019-12-23 19:21:49 +0100232 });
Charles Chanc91c8782016-03-30 17:54:24 -0700233 }
234
235 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100236 * Clean up when deactivating the application.
237 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100238 public void terminate() {
pier62e0b072019-12-23 19:21:49 +0100239 mcastCorrector.shutdown();
240 mcastWorker.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200241 mcastNextObjStore.destroy();
242 mcastRoleStore.destroy();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000243 mcastFilteringObjStore.destroy();
pier9e02ab72020-02-12 20:40:55 +0100244 mcastPathStore.destroy();
Pier72d0e582018-04-20 14:14:34 +0200245 mcastUtils.terminate();
246 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100247 }
248
249 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100250 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200251 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700252 *
pier62e0b072019-12-23 19:21:49 +0100253 * @param event the multicast event to be processed
Charles Chanc91c8782016-03-30 17:54:24 -0700254 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100255 public void processMcastEvent(McastEvent event) {
pier62e0b072019-12-23 19:21:49 +0100256 mcastWorker.execute(() -> processMcastEventInternal(event));
257 }
258
259 private void processMcastEventInternal(McastEvent event) {
260 lastMcastChange.set(Instant.now());
261 // Current subject is null, for ROUTE_REMOVED events
262 final McastRouteUpdate mcastUpdate = event.subject();
263 final McastRouteUpdate mcastPrevUpdate = event.prevSubject();
264 IpAddress mcastIp = mcastPrevUpdate.route().group();
265 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
266 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
267 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
268 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
269 Set<ConnectPoint> sources;
270 // Events handling
Pierdb27b8d2018-04-17 16:29:56 +0200271 if (event.type() == ROUTE_ADDED) {
pier62e0b072019-12-23 19:21:49 +0100272 processRouteAddedInternal(mcastUpdate.route().group());
273 } else if (event.type() == ROUTE_REMOVED) {
274 processRouteRemovedInternal(prevSources, mcastIp);
275 } else if (event.type() == SOURCES_ADDED) {
276 // Current subject and prev just differ for the source connect points
277 sources = mcastUpdate.sources()
278 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
279 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
280 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
281 } else if (event.type() == SOURCES_REMOVED) {
282 // Current subject and prev just differ for the source connect points
283 sources = mcastUpdate.sources()
284 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
285 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
286 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
287 } else if (event.type() == SINKS_ADDED) {
288 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
289 } else if (event.type() == SINKS_REMOVED) {
290 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pierdb27b8d2018-04-17 16:29:56 +0200291 } else {
pier62e0b072019-12-23 19:21:49 +0100292 log.warn("Event {} not handled", event);
Pierdb27b8d2018-04-17 16:29:56 +0200293 }
Pier Luigi6786b922018-02-02 16:19:11 +0100294 }
295
296 /**
Piere99511d2018-04-19 16:47:06 +0200297 * Process the SOURCES_ADDED event.
298 *
299 * @param sources the sources connect point
300 * @param mcastIp the group address
301 * @param sinks the sinks connect points
302 */
303 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
304 Map<HostId, Set<ConnectPoint>> sinks) {
pier62e0b072019-12-23 19:21:49 +0100305 lastMcastChange.set(Instant.now());
306 log.info("Processing sources added {} for group {}", sources, mcastIp);
307 if (!mcastUtils.isLeader(mcastIp)) {
308 log.debug("Skip {} due to lack of leadership", mcastIp);
309 return;
Piere99511d2018-04-19 16:47:06 +0200310 }
pier62e0b072019-12-23 19:21:49 +0100311 if (sources.isEmpty()) {
312 log.debug("Skip {} due to empty sources to be added", mcastIp);
313 return;
314 }
315 sources.forEach(source -> {
316 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
pier9e02ab72020-02-12 20:40:55 +0100317 Map<ConnectPoint, List<Path>> mcasTree = mcastUtils.computeSinkMcastTree(mcastIp, source.deviceId(),
318 sinksToBeAdded);
pier62e0b072019-12-23 19:21:49 +0100319 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
320 });
Piere99511d2018-04-19 16:47:06 +0200321 }
322
323 /**
324 * Process the SOURCES_REMOVED event.
325 *
326 * @param sourcesToBeRemoved the source connect points to be removed
327 * @param remainingSources the remainig source connect points
328 * @param mcastIp the group address
329 * @param sinks the sinks connect points
330 */
331 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
332 Set<ConnectPoint> remainingSources,
333 IpAddress mcastIp,
334 Map<HostId, Set<ConnectPoint>> sinks) {
pier62e0b072019-12-23 19:21:49 +0100335 lastMcastChange.set(Instant.now());
336 log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
337 if (!mcastUtils.isLeader(mcastIp)) {
338 log.debug("Skip {} due to lack of leadership", mcastIp);
339 return;
340 }
341 if (remainingSources.isEmpty()) {
342 log.debug("There are no more sources for {}", mcastIp);
343 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
344 return;
345 }
pier62e0b072019-12-23 19:21:49 +0100346 // Let's heal the trees
pier9e02ab72020-02-12 20:40:55 +0100347 Set<Link> notAffectedLinks = Sets.newHashSet();
348 Map<ConnectPoint, Set<Link>> affectedLinks = Maps.newHashMap();
pier62e0b072019-12-23 19:21:49 +0100349 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
pier9e02ab72020-02-12 20:40:55 +0100350 Set<ConnectPoint> totalSources = Sets.newHashSet(sourcesToBeRemoved);
pier62e0b072019-12-23 19:21:49 +0100351 totalSources.addAll(remainingSources);
pier9e02ab72020-02-12 20:40:55 +0100352 // Calculate all the links used by the sources and the current sinks
pier62e0b072019-12-23 19:21:49 +0100353 totalSources.forEach(source -> {
354 Set<ConnectPoint> currentSinks = sinks.values()
355 .stream().flatMap(Collection::stream)
356 .filter(sink -> isSinkForSource(mcastIp, sink, source))
Piere99511d2018-04-19 16:47:06 +0200357 .collect(Collectors.toSet());
pier62e0b072019-12-23 19:21:49 +0100358 candidateSinks.put(source, currentSinks);
pier9e02ab72020-02-12 20:40:55 +0100359 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
360 Collection<? extends List<Link>> storedPaths = Versioned.valueOrElse(
361 mcastPathStore.get(pathStoreKey), Lists.newArrayList());
pier62e0b072019-12-23 19:21:49 +0100362 currentSinks.forEach(currentSink -> {
pier9e02ab72020-02-12 20:40:55 +0100363 Optional<? extends List<Link>> currentPath = mcastUtils.getStoredPath(currentSink.deviceId(),
364 storedPaths);
pier62e0b072019-12-23 19:21:49 +0100365 if (currentPath.isPresent()) {
pier9e02ab72020-02-12 20:40:55 +0100366 if (!sourcesToBeRemoved.contains(source)) {
367 notAffectedLinks.addAll(currentPath.get());
pier62e0b072019-12-23 19:21:49 +0100368 } else {
pier9e02ab72020-02-12 20:40:55 +0100369 affectedLinks.compute(source, (k, v) -> {
370 v = v == null ? Sets.newHashSet() : v;
371 v.addAll(currentPath.get());
372 return v;
373 });
Piere99511d2018-04-19 16:47:06 +0200374 }
Piere99511d2018-04-19 16:47:06 +0200375 }
376 });
pier62e0b072019-12-23 19:21:49 +0100377 });
378 // Clean transit links
pier9e02ab72020-02-12 20:40:55 +0100379 affectedLinks.forEach((source, currentCandidateLinks) -> {
380 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, notAffectedLinks)
pier62e0b072019-12-23 19:21:49 +0100381 .immutableCopy();
382 if (!linksToBeRemoved.isEmpty()) {
383 currentCandidateLinks.forEach(link -> {
384 DeviceId srcLink = link.src().deviceId();
385 // Remove ports only on links to be removed
386 if (linksToBeRemoved.contains(link)) {
387 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
388 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
389 source : null));
390 }
391 // Remove role on the candidate links
392 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
393 });
394 }
395 });
396 // Clean ingress and egress
pier9e02ab72020-02-12 20:40:55 +0100397 sourcesToBeRemoved.forEach(source -> {
pier62e0b072019-12-23 19:21:49 +0100398 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
pier9e02ab72020-02-12 20:40:55 +0100399 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
pier62e0b072019-12-23 19:21:49 +0100400 currentSinks.forEach(currentSink -> {
401 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
402 source : null);
403 // Sinks co-located with the source
404 if (source.deviceId().equals(currentSink.deviceId())) {
405 if (source.port().equals(currentSink.port())) {
406 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
407 mcastIp, currentSink, source);
Piere99511d2018-04-19 16:47:06 +0200408 return;
409 }
pier62e0b072019-12-23 19:21:49 +0100410 // We need to check against the other sources and if it is
411 // necessary remove the port from the device - no overlap
Piere99511d2018-04-19 16:47:06 +0200412 Set<VlanId> otherVlans = remainingSources.stream()
pier62e0b072019-12-23 19:21:49 +0100413 // Only sources co-located and having this sink
414 .filter(remainingSource -> remainingSource.deviceId()
415 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
Piere99511d2018-04-19 16:47:06 +0200416 .contains(currentSink))
417 .map(remainingSource -> mcastUtils.assignedVlan(
418 remainingSource.deviceId().equals(currentSink.deviceId()) ?
419 remainingSource : null)).collect(Collectors.toSet());
Piere99511d2018-04-19 16:47:06 +0200420 if (!otherVlans.contains(assignedVlan)) {
421 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
422 mcastIp, assignedVlan);
423 }
424 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
425 source));
pier62e0b072019-12-23 19:21:49 +0100426 return;
427 }
428 Set<VlanId> otherVlans = remainingSources.stream()
429 .filter(remainingSource -> candidateSinks.get(remainingSource)
430 .contains(currentSink))
431 .map(remainingSource -> mcastUtils.assignedVlan(
432 remainingSource.deviceId().equals(currentSink.deviceId()) ?
433 remainingSource : null)).collect(Collectors.toSet());
434 // Sinks on other leaves
435 if (!otherVlans.contains(assignedVlan)) {
436 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
437 mcastIp, assignedVlan);
438 }
439 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
440 source));
Piere99511d2018-04-19 16:47:06 +0200441 });
pier9e02ab72020-02-12 20:40:55 +0100442 // Clean the mcast paths
443 mcastPathStore.removeAll(pathStoreKey);
pier62e0b072019-12-23 19:21:49 +0100444 });
Piere99511d2018-04-19 16:47:06 +0200445 }
446
447 /**
Pierdb27b8d2018-04-17 16:29:56 +0200448 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100449 *
Pierdb27b8d2018-04-17 16:29:56 +0200450 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100451 */
Pierdb27b8d2018-04-17 16:29:56 +0200452 private void processRouteAddedInternal(IpAddress mcastIp) {
pier62e0b072019-12-23 19:21:49 +0100453 lastMcastChange.set(Instant.now());
454 log.info("Processing route added for Multicast group {}", mcastIp);
455 // Just elect a new leader
456 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100457 }
458
459 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100460 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200461 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100462 * @param mcastIp multicast group IP address
463 */
Piere99511d2018-04-19 16:47:06 +0200464 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
pier62e0b072019-12-23 19:21:49 +0100465 lastMcastChange.set(Instant.now());
466 log.info("Processing route removed for group {}", mcastIp);
467 if (!mcastUtils.isLeader(mcastIp)) {
468 log.debug("Skip {} due to lack of leadership", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200469 mcastUtils.withdrawLeader(mcastIp);
pier62e0b072019-12-23 19:21:49 +0100470 return;
Pier Luigi6786b922018-02-02 16:19:11 +0100471 }
pier62e0b072019-12-23 19:21:49 +0100472 sources.forEach(source -> {
473 // Find out the ingress, transit and egress device of the affected group
474 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
475 .stream().findFirst().orElse(null);
476 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
477 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
pier9e02ab72020-02-12 20:40:55 +0100478 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
pier62e0b072019-12-23 19:21:49 +0100479 // If there are no egress and transit devices, sinks could be only on the ingress
480 if (!egressDevices.isEmpty()) {
481 egressDevices.forEach(deviceId -> {
482 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
483 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
484 });
485 }
486 if (!transitDevices.isEmpty()) {
487 transitDevices.forEach(deviceId -> {
488 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
489 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
490 });
491 }
492 if (ingressDevice != null) {
493 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
494 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
495 }
pier9e02ab72020-02-12 20:40:55 +0100496 // Clean the mcast paths
497 mcastPathStore.removeAll(pathStoreKey);
pier62e0b072019-12-23 19:21:49 +0100498 });
499 // Finally, withdraw the leadership
500 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100501 }
502
Pier7b657162018-03-27 11:29:42 -0700503 /**
504 * Process sinks to be removed.
505 *
Piere99511d2018-04-19 16:47:06 +0200506 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700507 * @param mcastIp the ip address of the group
508 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200509 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700510 */
Piere99511d2018-04-19 16:47:06 +0200511 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700512 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200513 Map<HostId, Set<ConnectPoint>> prevSinks) {
pier62e0b072019-12-23 19:21:49 +0100514 lastMcastChange.set(Instant.now());
515 log.info("Processing sinks removed for group {} and for sources {}",
516 mcastIp, sources);
517 if (!mcastUtils.isLeader(mcastIp)) {
518 log.debug("Skip {} due to lack of leadership", mcastIp);
519 return;
Pier7b657162018-03-27 11:29:42 -0700520 }
pier9e02ab72020-02-12 20:40:55 +0100521 Map<ConnectPoint, Map<ConnectPoint, Optional<? extends List<Link>>>> treesToBeRemoved = Maps.newHashMap();
pier62e0b072019-12-23 19:21:49 +0100522 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
pier9e02ab72020-02-12 20:40:55 +0100523 Set<Link> goodLinks = Sets.newHashSet();
524 Map<ConnectPoint, Set<DeviceId>> goodDevicesBySource = Maps.newHashMap();
pier62e0b072019-12-23 19:21:49 +0100525 sources.forEach(source -> {
526 // Save the path associated to the sinks to be removed
pier9e02ab72020-02-12 20:40:55 +0100527 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
pier62e0b072019-12-23 19:21:49 +0100528 newSinks, source);
pier9e02ab72020-02-12 20:40:55 +0100529 Map<ConnectPoint, Optional<? extends List<Link>>> treeToBeRemoved = Maps.newHashMap();
530 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
531 Collection<? extends List<Link>> storedPaths = Versioned.valueOrElse(
532 mcastPathStore.get(pathStoreKey), Lists.newArrayList());
533 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, mcastUtils.getStoredPath(sink.deviceId(),
534 storedPaths)));
pier62e0b072019-12-23 19:21:49 +0100535 treesToBeRemoved.put(source, treeToBeRemoved);
pier9e02ab72020-02-12 20:40:55 +0100536 // Save the good links and good devices
537 Set<DeviceId> goodDevices = Sets.newHashSet();
538 Set<DeviceId> totalDevices = Sets.newHashSet(getDevice(mcastIp, EGRESS, source));
539 totalDevices.addAll(getDevice(mcastIp, INGRESS, source));
540 Set<ConnectPoint> notAffectedSinks = Sets.newHashSet();
541 // Compute good sinks
542 totalDevices.forEach(device -> {
543 Set<ConnectPoint> sinks = getSinks(mcastIp, device, source);
544 notAffectedSinks.addAll(Sets.difference(sinks, sinksToBeRemoved));
545 });
546 // Compute good paths and good devices
547 notAffectedSinks.forEach(notAffectedSink -> {
548 Optional<? extends List<Link>> notAffectedPath = mcastUtils.getStoredPath(notAffectedSink.deviceId(),
549 storedPaths);
550 if (notAffectedPath.isPresent()) {
551 List<Link> goodPath = notAffectedPath.get();
552 goodLinks.addAll(goodPath);
553 goodPath.forEach(link -> goodDevices.add(link.src().deviceId()));
554 } else {
555 goodDevices.add(notAffectedSink.deviceId());
556 }
557 });
558 goodDevicesBySource.compute(source, (k, v) -> {
559 v = v == null ? Sets.newHashSet() : v;
560 v.addAll(goodDevices);
561 return v;
562 });
pier62e0b072019-12-23 19:21:49 +0100563 // Recover the dual-homed sinks
564 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
565 prevSinks, source);
566 treesToBeAdded.put(source, sinksToBeRecovered);
567 });
568 // Remove the sinks taking into account the multiple sources and the original paths
569 treesToBeRemoved.forEach((source, tree) ->
pier9e02ab72020-02-12 20:40:55 +0100570 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path,
571 goodLinks, goodDevicesBySource.get(source))));
pier62e0b072019-12-23 19:21:49 +0100572 // Add new sinks according to the recovery procedure
573 treesToBeAdded.forEach((source, sinks) ->
574 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700575 }
576
Pier Luigi6786b922018-02-02 16:19:11 +0100577 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100578 * Removes a path from source to sink for given multicast group.
579 *
580 * @param source connect point of the multicast source
581 * @param sink connection point of the multicast sink
582 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200583 * @param mcastPath path associated to the sink
pier9e02ab72020-02-12 20:40:55 +0100584 * @param usedLinks links used by the other sinks
585 * @param usedDevices devices used by other sinks
Pier Luigi35dab3f2018-01-25 16:16:02 +0100586 */
587 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
pier9e02ab72020-02-12 20:40:55 +0100588 IpAddress mcastIp, Optional<? extends List<Link>> mcastPath,
589 Set<Link> usedLinks, Set<DeviceId> usedDevices) {
590
591 log.info("Used links {}", usedLinks);
592 log.info("Used devices {}", usedDevices);
593
pier62e0b072019-12-23 19:21:49 +0100594 lastMcastChange.set(Instant.now());
595 log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
596 boolean isLast;
597 // When source and sink are on the same device
598 if (source.deviceId().equals(sink.deviceId())) {
599 // Source and sink are on even the same port. There must be something wrong.
600 if (source.port().equals(sink.port())) {
601 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100602 return;
603 }
pier62e0b072019-12-23 19:21:49 +0100604 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100605 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200606 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100607 }
pier62e0b072019-12-23 19:21:49 +0100608 return;
609 }
610 // Process the egress device
611 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
612 if (isLast) {
613 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
614 }
615 // If this is the last sink on the device, also update upstream
616 if (mcastPath.isPresent()) {
pier9e02ab72020-02-12 20:40:55 +0100617 List<Link> links = Lists.newArrayList(mcastPath.get());
618 if (isLast) {
619 // Clean the path
620 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
621 mcastPathStore.remove(pathStoreKey, mcastPath.get());
622 Collections.reverse(links);
623 for (Link link : links) {
624 // If nobody is using the port remove
625 if (!usedLinks.contains(link)) {
626 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
627 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
628 }
629 // If nobody is using the device
630 if (!usedDevices.contains(link.src().deviceId())) {
pier62e0b072019-12-23 19:21:49 +0100631 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100632 }
Charles Chanc91c8782016-03-30 17:54:24 -0700633 }
634 }
635 }
636 }
637
Pier7b657162018-03-27 11:29:42 -0700638 /**
639 * Process sinks to be added.
640 *
Piere99511d2018-04-19 16:47:06 +0200641 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700642 * @param mcastIp the group IP
643 * @param newSinks the new sinks to be processed
644 * @param allPrevSinks all previous sinks
645 */
Piere99511d2018-04-19 16:47:06 +0200646 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700647 Map<HostId, Set<ConnectPoint>> newSinks,
648 Set<ConnectPoint> allPrevSinks) {
pier62e0b072019-12-23 19:21:49 +0100649 lastMcastChange.set(Instant.now());
650 log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
651 if (!mcastUtils.isLeader(mcastIp)) {
652 log.debug("Skip {} due to lack of leadership", mcastIp);
653 return;
Pier7b657162018-03-27 11:29:42 -0700654 }
pier62e0b072019-12-23 19:21:49 +0100655 sources.forEach(source -> {
656 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
657 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
658 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
659 });
Pier7b657162018-03-27 11:29:42 -0700660 }
661
Charles Chanc91c8782016-03-30 17:54:24 -0700662 /**
663 * Establishes a path from source to sink for given multicast group.
664 *
665 * @param source connect point of the multicast source
666 * @param sink connection point of the multicast sink
667 * @param mcastIp multicast group IP address
668 */
669 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700670 IpAddress mcastIp, List<Path> allPaths) {
pier62e0b072019-12-23 19:21:49 +0100671 lastMcastChange.set(Instant.now());
672 log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
673 // Process the ingress device
674 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
675 mcastUtils.assignedVlan(source), mcastIp.isIp4());
676 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
677 if (source.deviceId().equals(sink.deviceId())) {
678 if (source.port().equals(sink.port())) {
679 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
680 mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100681 return;
682 }
pier62e0b072019-12-23 19:21:49 +0100683 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
684 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
685 return;
686 }
687 // Find a path. If present, create/update groups and flows for each hop
pier9e02ab72020-02-12 20:40:55 +0100688 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths);
pier62e0b072019-12-23 19:21:49 +0100689 if (mcastPath.isPresent()) {
690 List<Link> links = mcastPath.get().links();
pier9e02ab72020-02-12 20:40:55 +0100691 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
pier62e0b072019-12-23 19:21:49 +0100692 // Setup mcast role for ingress
693 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
694 // Setup properly the transit forwarding
695 links.forEach(link -> {
696 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
697 mcastUtils.assignedVlan(link.src().deviceId()
698 .equals(source.deviceId()) ? source : null));
699 McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
700 mcastUtils.assignedVlan(null), mcastIp.isIp4());
701 addFilterToDevice(filteringKey, mcastIp, null);
702 });
703 // Setup mcast role for the transit
704 links.stream()
705 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
pier9e02ab72020-02-12 20:40:55 +0100706 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
707 source), TRANSIT));
pier62e0b072019-12-23 19:21:49 +0100708 // Process the egress device
709 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
710 // Setup mcast role for egress
711 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
pier9e02ab72020-02-12 20:40:55 +0100712 // Store the used path
713 mcastPathStore.put(pathStoreKey, links);
pier62e0b072019-12-23 19:21:49 +0100714 } else {
715 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700716 }
717 }
718
719 /**
pier62e0b072019-12-23 19:21:49 +0100720 * Processes the PORT_UPDATED event.
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000721 *
722 * @param affectedDevice Affected device
723 * @param affectedPort Affected port
724 */
725 public void processPortUpdate(Device affectedDevice, Port affectedPort) {
pier62e0b072019-12-23 19:21:49 +0100726 mcastWorker.execute(() -> processPortUpdateInternal(affectedDevice, affectedPort));
727 }
728
729 private void processPortUpdateInternal(Device affectedDevice, Port affectedPort) {
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000730 // Clean the filtering obj store. Edge port case.
pier62e0b072019-12-23 19:21:49 +0100731 lastMcastChange.set(Instant.now());
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000732 ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
733 if (!affectedPort.isEnabled()) {
pier62e0b072019-12-23 19:21:49 +0100734 log.info("Processing port down {}", portDown);
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000735 updateFilterObjStoreByPort(portDown);
736 }
737 }
738
739 /**
Charles Chan72779502016-04-23 17:36:10 -0700740 * Processes the LINK_DOWN event.
741 *
piereaddb182020-02-03 13:50:53 +0100742 * @param linkDown Link that is going down
Charles Chan72779502016-04-23 17:36:10 -0700743 */
piereaddb182020-02-03 13:50:53 +0100744 public void processLinkDown(Link linkDown) {
pier62e0b072019-12-23 19:21:49 +0100745 mcastWorker.execute(() -> processLinkDownInternal(linkDown));
746 }
747
748 private void processLinkDownInternal(Link linkDown) {
pier62e0b072019-12-23 19:21:49 +0100749 // Get mcast groups affected by the link going down
750 Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
751 log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
752 affectedGroups.forEach(mcastIp -> {
pier9e02ab72020-02-12 20:40:55 +0100753 lastMcastChange.set(Instant.now());
pier62e0b072019-12-23 19:21:49 +0100754 log.debug("Processing link down {} for group {}", linkDown, mcastIp);
755 recoverFailure(mcastIp, linkDown);
756 });
Charles Chan72779502016-04-23 17:36:10 -0700757 }
758
759 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100760 * Process the DEVICE_DOWN event.
761 *
762 * @param deviceDown device going down
763 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100764 public void processDeviceDown(DeviceId deviceDown) {
pier62e0b072019-12-23 19:21:49 +0100765 mcastWorker.execute(() -> processDeviceDownInternal(deviceDown));
766 }
767
768 private void processDeviceDownInternal(DeviceId deviceDown) {
pier62e0b072019-12-23 19:21:49 +0100769 // Get the mcast groups affected by the device going down
770 Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
771 log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
772 updateFilterObjStoreByDevice(deviceDown);
773 affectedGroups.forEach(mcastIp -> {
pier9e02ab72020-02-12 20:40:55 +0100774 lastMcastChange.set(Instant.now());
pier62e0b072019-12-23 19:21:49 +0100775 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
776 recoverFailure(mcastIp, deviceDown);
777 });
Pier Luigi580fd8a2018-01-16 10:47:50 +0100778 }
779
780 /**
Piere99511d2018-04-19 16:47:06 +0200781 * General failure recovery procedure.
782 *
783 * @param mcastIp the group to recover
784 * @param failedElement the failed element
785 */
786 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
pier9e02ab72020-02-12 20:40:55 +0100787 // Do not proceed if we are not the leaders
Piere99511d2018-04-19 16:47:06 +0200788 if (!mcastUtils.isLeader(mcastIp)) {
789 log.debug("Skip {} due to lack of leadership", mcastIp);
790 return;
791 }
pier9e02ab72020-02-12 20:40:55 +0100792 // Skip if it is not an infra failure
793 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
794 if (!mcastUtils.isInfraFailure(transitDevices, failedElement)) {
795 log.debug("Skip {} not an infrastructure failure", mcastIp);
796 return;
797 }
Piere99511d2018-04-19 16:47:06 +0200798 // Do not proceed if the sources of this group are missing
799 Set<ConnectPoint> sources = getSources(mcastIp);
800 if (sources.isEmpty()) {
801 log.warn("Missing sources for group {}", mcastIp);
802 return;
803 }
pier9e02ab72020-02-12 20:40:55 +0100804 // Get all the paths, affected paths, good links and good devices
805 Set<List<Link>> storedPaths = getStoredPaths(mcastIp);
806 Set<List<Link>> affectedPaths = mcastUtils.getAffectedPaths(storedPaths, failedElement);
807 Set<Link> goodLinks = Sets.newHashSet();
808 Map<DeviceId, Set<DeviceId>> goodDevicesBySource = Maps.newHashMap();
809 Map<DeviceId, Set<ConnectPoint>> processedSourcesByEgress = Maps.newHashMap();
810 Sets.difference(storedPaths, affectedPaths).forEach(goodPath -> {
811 goodLinks.addAll(goodPath);
812 DeviceId srcDevice = goodPath.get(0).src().deviceId();
813 Set<DeviceId> goodDevices = Sets.newHashSet();
814 goodPath.forEach(link -> goodDevices.add(link.src().deviceId()));
815 goodDevicesBySource.compute(srcDevice, (k, v) -> {
816 v = v == null ? Sets.newHashSet() : v;
817 v.addAll(goodDevices);
818 return v;
Piere99511d2018-04-19 16:47:06 +0200819 });
820 });
pier9e02ab72020-02-12 20:40:55 +0100821 affectedPaths.forEach(affectedPath -> {
822 // TODO remove
823 log.info("Good links {}", goodLinks);
824 // TODO remove
825 log.info("Good devices {}", goodDevicesBySource);
826 // TODO trace
827 log.info("Healing the path {}", affectedPath);
828 DeviceId srcDevice = affectedPath.get(0).src().deviceId();
829 DeviceId dstDevice = affectedPath.get(affectedPath.size() - 1).dst().deviceId();
830 // Fix in one shot multiple sources
831 Set<ConnectPoint> affectedSources = sources.stream()
832 .filter(device -> device.deviceId().equals(srcDevice))
833 .collect(Collectors.toSet());
834 Set<ConnectPoint> processedSources = processedSourcesByEgress.getOrDefault(dstDevice,
835 Collections.emptySet());
836 Optional<Path> alternativePath = getPath(srcDevice, dstDevice, mcastIp, null);
837 // If an alternative is possible go ahead
838 if (alternativePath.isPresent()) {
839 // TODO trace
840 log.info("Alternative path {}", alternativePath.get().links());
841 } else {
842 // Otherwise try to come up with an alternative
843 // TODO trace
844 log.info("No alternative path");
845 Set<ConnectPoint> notAffectedSources = Sets.difference(sources, affectedSources);
846 Set<ConnectPoint> remainingSources = Sets.difference(notAffectedSources, processedSources);
847 alternativePath = recoverSinks(dstDevice, mcastIp, affectedSources, remainingSources);
848 processedSourcesByEgress.compute(dstDevice, (k, v) -> {
849 v = v == null ? Sets.newHashSet() : v;
850 v.addAll(affectedSources);
851 return v;
852 });
Piere99511d2018-04-19 16:47:06 +0200853 }
pier9e02ab72020-02-12 20:40:55 +0100854 // Recover from the failure if possible
855 Optional<Path> finalPath = alternativePath;
856 affectedSources.forEach(affectedSource -> {
857 // Update the mcastPath store
858 McastPathStoreKey mcastPathStoreKey = new McastPathStoreKey(mcastIp, affectedSource);
859 // Verify if there are local sinks
860 Set<DeviceId> localSinks = getSinks(mcastIp, srcDevice, affectedSource).stream()
861 .map(ConnectPoint::deviceId)
862 .collect(Collectors.toSet());
863 Set<DeviceId> goodDevices = goodDevicesBySource.compute(affectedSource.deviceId(), (k, v) -> {
864 v = v == null ? Sets.newHashSet() : v;
865 v.addAll(localSinks);
866 return v;
867 });
868 // TODO remove
869 log.info("Good devices {}", goodDevicesBySource);
870 Collection<? extends List<Link>> storedPathsBySource = Versioned.valueOrElse(
871 mcastPathStore.get(mcastPathStoreKey), Lists.newArrayList());
872 Optional<? extends List<Link>> storedPath = storedPathsBySource.stream()
873 .filter(path -> path.equals(affectedPath))
874 .findFirst();
875 // Remove bad links
876 affectedPath.forEach(affectedLink -> {
877 DeviceId affectedDevice = affectedLink.src().deviceId();
878 // If there is overlap with good paths - skip it
879 if (!goodLinks.contains(affectedLink)) {
880 removePortFromDevice(affectedDevice, affectedLink.src().port(), mcastIp,
881 mcastUtils.assignedVlan(affectedDevice.equals(affectedSource.deviceId()) ?
882 affectedSource : null));
883 }
884 // Remove role on the affected links if last
885 if (!goodDevices.contains(affectedDevice)) {
886 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, affectedDevice, affectedSource));
Charles Chanba59dd62018-05-10 22:19:49 +0000887 }
Piere99511d2018-04-19 16:47:06 +0200888 });
pier9e02ab72020-02-12 20:40:55 +0100889 // Sometimes the removal fails for serialization issue
890 // trying with the original object as workaround
891 if (storedPath.isPresent()) {
892 mcastPathStore.remove(mcastPathStoreKey, storedPath.get());
893 } else {
894 log.warn("Unable to find the corresponding path - trying removeal");
895 mcastPathStore.remove(mcastPathStoreKey, affectedPath);
896 }
897 // Program new links
898 if (finalPath.isPresent()) {
899 List<Link> links = finalPath.get().links();
900 installPath(mcastIp, affectedSource, links);
901 mcastPathStore.put(mcastPathStoreKey, links);
902 links.forEach(link -> goodDevices.add(link.src().deviceId()));
903 goodDevicesBySource.compute(srcDevice, (k, v) -> {
904 v = v == null ? Sets.newHashSet() : v;
905 v.addAll(goodDevices);
906 return v;
907 });
908 goodLinks.addAll(finalPath.get().links());
909 }
910 });
Piere99511d2018-04-19 16:47:06 +0200911 });
912 }
913
914 /**
pier9e02ab72020-02-12 20:40:55 +0100915 * Try to recover sinks using alternative locations.
Pier7b657162018-03-27 11:29:42 -0700916 *
pier9e02ab72020-02-12 20:40:55 +0100917 * @param notRecovered the device not recovered
Pier7b657162018-03-27 11:29:42 -0700918 * @param mcastIp the group address
pier9e02ab72020-02-12 20:40:55 +0100919 * @param affectedSources affected sources
920 * @param goodSources sources not affected
Pier7b657162018-03-27 11:29:42 -0700921 */
pier9e02ab72020-02-12 20:40:55 +0100922 private Optional<Path> recoverSinks(DeviceId notRecovered, IpAddress mcastIp,
923 Set<ConnectPoint> affectedSources,
924 Set<ConnectPoint> goodSources) {
925 log.debug("Processing recover sinks on {} for group {}", notRecovered, mcastIp);
926 Map<ConnectPoint, Set<ConnectPoint>> affectedSinksBySource = Maps.newHashMap();
927 Map<ConnectPoint, Set<ConnectPoint>> sinksBySource = Maps.newHashMap();
928 Set<ConnectPoint> sources = Sets.union(affectedSources, goodSources);
929 // Hosts influenced by the failure
930 Map<HostId, Set<ConnectPoint>> hostIdSetMap = mcastUtils.getAffectedSinks(notRecovered, mcastIp);
931 // Locations influenced by the failure
932 Set<ConnectPoint> affectedSinks = hostIdSetMap.values()
933 .stream()
934 .flatMap(Collection::stream)
935 .filter(connectPoint -> connectPoint.deviceId().equals(notRecovered))
936 .collect(Collectors.toSet());
937 // All locations
938 Set<ConnectPoint> sinks = hostIdSetMap.values()
939 .stream()
940 .flatMap(Collection::stream)
941 .collect(Collectors.toSet());
942 // Maps sinks with the sources
943 sources.forEach(source -> {
944 Set<ConnectPoint> currentSinks = affectedSinks.stream()
945 .filter(sink -> isSinkForSource(mcastIp, sink, source))
946 .collect(Collectors.toSet());
947 affectedSinksBySource.put(source, currentSinks);
Pier7b657162018-03-27 11:29:42 -0700948 });
pier9e02ab72020-02-12 20:40:55 +0100949 // Remove sinks one by one if they are not used by other sources
950 affectedSources.forEach(affectedSource -> {
951 Set<ConnectPoint> currentSinks = affectedSinksBySource.get(affectedSource);
952 log.info("Current sinks {} for source {}", currentSinks, affectedSource);
953 currentSinks.forEach(currentSink -> {
954 VlanId assignedVlan = mcastUtils.assignedVlan(
955 affectedSource.deviceId().equals(currentSink.deviceId()) ? affectedSource : null);
956 log.info("Assigned vlan {}", assignedVlan);
957 Set<VlanId> otherVlans = goodSources.stream()
958 .filter(remainingSource -> affectedSinksBySource.get(remainingSource).contains(currentSink))
959 .map(remainingSource -> mcastUtils.assignedVlan(
960 remainingSource.deviceId().equals(currentSink.deviceId()) ? remainingSource : null))
961 .collect(Collectors.toSet());
962 log.info("Other vlans {}", otherVlans);
963 // Sinks on other leaves
964 if (!otherVlans.contains(assignedVlan)) {
965 removePortFromDevice(currentSink.deviceId(), currentSink.port(), mcastIp, assignedVlan);
Pier7b657162018-03-27 11:29:42 -0700966 }
pier9e02ab72020-02-12 20:40:55 +0100967 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(), affectedSource));
968 });
Pier7b657162018-03-27 11:29:42 -0700969 });
pier9e02ab72020-02-12 20:40:55 +0100970 // Get the sinks to be added and the new egress
971 Set<DeviceId> newEgress = Sets.newHashSet();
972 affectedSources.forEach(affectedSource -> {
973 Set<ConnectPoint> currentSinks = affectedSinksBySource.get(affectedSource);
974 Set<ConnectPoint> newSinks = Sets.difference(sinks, currentSinks);
975 sinksBySource.put(affectedSource, newSinks);
976 newSinks.stream()
977 .map(ConnectPoint::deviceId)
978 .forEach(newEgress::add);
979 });
980 log.info("newEgress {}", newEgress);
981 // If there are more than one new egresses, return the problem
982 if (newEgress.size() != 1) {
983 log.warn("There are {} new egress, wrong configuration. Abort.", newEgress.size());
984 return Optional.empty();
985 }
986 DeviceId egress = newEgress.stream()
987 .findFirst()
988 .orElse(null);
989 DeviceId ingress = affectedSources.stream()
990 .map(ConnectPoint::deviceId)
991 .findFirst()
992 .orElse(null);
993 log.info("Ingress {}", ingress);
994 if (ingress == null) {
995 log.warn("No new ingress, wrong configuration. Abort.");
996 return Optional.empty();
997 }
998 // Get an alternative path
999 Optional<Path> alternativePath = getPath(ingress, egress, mcastIp, null);
1000 // If there are new path install sinks and return path
1001 if (alternativePath.isPresent()) {
1002 log.info("Alternative path {}", alternativePath.get().links());
1003 affectedSources.forEach(affectedSource -> {
1004 Set<ConnectPoint> newSinks = sinksBySource.get(affectedSource);
1005 newSinks.forEach(newSink -> {
1006 addPortToDevice(newSink.deviceId(), newSink.port(), mcastIp, mcastUtils.assignedVlan(null));
1007 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, newSink.deviceId(), affectedSource), EGRESS);
1008 });
1009 });
1010 return alternativePath;
1011 }
1012 // No new path but sinks co-located with sources install sinks and return empty
1013 if (ingress.equals(egress)) {
1014 log.info("No Alternative path but sinks co-located");
1015 affectedSources.forEach(affectedSource -> {
1016 Set<ConnectPoint> newSinks = sinksBySource.get(affectedSource);
1017 newSinks.forEach(newSink -> {
1018 if (affectedSource.port().equals(newSink.port())) {
1019 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
1020 mcastIp, newSink, affectedSource);
1021 return;
1022 }
1023 addPortToDevice(newSink.deviceId(), newSink.port(), mcastIp,
1024 mcastUtils.assignedVlan(affectedSource));
1025 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, newSink.deviceId(), affectedSource), INGRESS);
1026 });
1027 });
1028 }
1029 return Optional.empty();
Pier7b657162018-03-27 11:29:42 -07001030 }
1031
1032 /**
Pier28164682018-04-17 15:50:43 +02001033 * Process all the sinks related to a mcast group and return
1034 * the ones to be removed.
1035 *
1036 * @param mcastIp the group address
1037 * @param prevsinks the previous sinks to be evaluated
1038 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +02001039 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +02001040 * @return the set of the sinks to be removed
1041 */
1042 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
1043 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +02001044 Map<HostId, Set<ConnectPoint>> newSinks,
1045 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +02001046 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001047 log.debug("Processing sinks to be removed for Multicast group {}, source {}",
1048 mcastIp, source);
Pier28164682018-04-17 15:50:43 +02001049 prevsinks.forEach(((hostId, connectPoints) -> {
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001050 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001051 //in this case connect points are single homed sinks.
1052 //just found the difference btw previous and new sinks for this source.
1053 Set<ConnectPoint> difference = Sets.difference(connectPoints, newSinks.get(hostId));
1054 sinksToBeProcessed.addAll(difference);
1055 return;
1056 }
Pier28164682018-04-17 15:50:43 +02001057 // We have to check with the existing flows
1058 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001059 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +02001060 .findFirst().orElse(null);
1061 if (sinkToBeProcessed != null) {
1062 // If the host has been removed or location has been removed
1063 if (!newSinks.containsKey(hostId) ||
1064 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
1065 sinksToBeProcessed.add(sinkToBeProcessed);
1066 }
1067 }
1068 }));
1069 // We have done, return the set
1070 return sinksToBeProcessed;
1071 }
1072
1073 /**
Pier7b657162018-03-27 11:29:42 -07001074 * Process new locations and return the set of sinks to be added
1075 * in the context of the recovery.
1076 *
Pier28164682018-04-17 15:50:43 +02001077 * @param newSinks the remaining sinks
1078 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +02001079 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -07001080 * @return the set of the sinks to be processed
1081 */
Charles Chanba59dd62018-05-10 22:19:49 +00001082 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
1083 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +02001084 Map<HostId, Set<ConnectPoint>> prevSinks,
1085 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -07001086 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001087 log.debug("Processing sinks to be recovered for Multicast group {}, source {}",
1088 mcastIp, source);
Pier28164682018-04-17 15:50:43 +02001089 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -07001090 // If it has more than 1 locations
1091 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
1092 log.debug("Skip {} since sink {} has {} locations",
1093 mcastIp, hostId, connectPoints.size());
1094 return;
1095 }
Pier28164682018-04-17 15:50:43 +02001096 // If previously it had two locations, we need to recover it
1097 // Filter out if the remaining location is already served
1098 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +02001099 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001100 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +02001101 .findFirst().orElse(null);
1102 if (sinkToBeProcessed != null) {
1103 sinksToBeProcessed.add(sinkToBeProcessed);
1104 }
Pier28164682018-04-17 15:50:43 +02001105 }
Pier7b657162018-03-27 11:29:42 -07001106 });
1107 return sinksToBeProcessed;
1108 }
1109
1110 /**
1111 * Process all the sinks related to a mcast group and return
1112 * the ones to be processed.
1113 *
1114 * @param source the source connect point
1115 * @param mcastIp the group address
1116 * @param sinks the sinks to be evaluated
1117 * @return the set of the sinks to be processed
1118 */
1119 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1120 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001121 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001122 log.debug("Processing sinks to be added for Multicast group {}, source {}",
1123 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -07001124 sinks.forEach(((hostId, connectPoints) -> {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001125 //add all connect points that are not tied with any host
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001126 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001127 sinksToBeProcessed.addAll(connectPoints);
1128 return;
1129 }
Pier7b657162018-03-27 11:29:42 -07001130 // If it has more than 2 locations
1131 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1132 log.debug("Skip {} since sink {} has {} locations",
1133 mcastIp, hostId, connectPoints.size());
1134 return;
1135 }
1136 // If it has one location, just use it
1137 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001138 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001139 return;
1140 }
1141 // We prefer to reuse existing flows
1142 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001143 .filter(connectPoint -> {
1144 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1145 return false;
1146 }
1147 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1148 return false;
1149 }
1150 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001151 .filter(remaining -> !remaining.equals(connectPoint))
1152 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001153 // We are already serving the sink
1154 return !isSinkForSource(mcastIp, other, source);
1155 }).findFirst().orElse(null);
1156
Pier7b657162018-03-27 11:29:42 -07001157 if (sinkToBeProcessed != null) {
1158 sinksToBeProcessed.add(sinkToBeProcessed);
1159 return;
1160 }
1161 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001162 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001163 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001164 .filter(connectPoint -> {
1165 if (!egresses.contains(connectPoint.deviceId())) {
1166 return false;
1167 }
1168 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1169 return false;
1170 }
1171 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001172 .filter(remaining -> !remaining.equals(connectPoint))
1173 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001174 return !isSinkForSource(mcastIp, other, source);
1175 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001176 if (sinkToBeProcessed != null) {
1177 sinksToBeProcessed.add(sinkToBeProcessed);
1178 return;
1179 }
1180 // Otherwise we prefer a location co-located with the source (if it exists)
1181 sinkToBeProcessed = connectPoints.stream()
1182 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1183 .findFirst().orElse(null);
1184 if (sinkToBeProcessed != null) {
1185 sinksToBeProcessed.add(sinkToBeProcessed);
1186 return;
1187 }
Piere99511d2018-04-19 16:47:06 +02001188 // Finally, we randomly pick a new location if it is reachable
1189 sinkToBeProcessed = connectPoints.stream()
1190 .filter(connectPoint -> {
1191 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1192 return false;
1193 }
1194 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001195 .filter(remaining -> !remaining.equals(connectPoint))
1196 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001197 return !isSinkForSource(mcastIp, other, source);
1198 }).findFirst().orElse(null);
1199 if (sinkToBeProcessed != null) {
1200 sinksToBeProcessed.add(sinkToBeProcessed);
1201 }
Pier7b657162018-03-27 11:29:42 -07001202 }));
Pier7b657162018-03-27 11:29:42 -07001203 return sinksToBeProcessed;
1204 }
1205
1206 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001207 * Adds a port to given multicast group on given device. This involves the
1208 * update of L3 multicast group and multicast routing table entry.
1209 *
1210 * @param deviceId device ID
1211 * @param port port to be added
1212 * @param mcastIp multicast group
1213 * @param assignedVlan assigned VLAN ID
1214 */
Charles Chanba59dd62018-05-10 22:19:49 +00001215 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1216 IpAddress mcastIp, VlanId assignedVlan) {
pier9e02ab72020-02-12 20:40:55 +01001217 // TODO trace
1218 log.info("Adding {} on {}/{} and vlan {}", mcastIp, deviceId, port, assignedVlan);
Piere99511d2018-04-19 16:47:06 +02001219 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001220 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001221 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001222 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001223 // First time someone request this mcast group via this device
1224 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001225 // New nextObj
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001226 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1227 log.debug("Passing 0 as nextId for unconfigured device {}", deviceId);
1228 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1229 portBuilder.build(), 0).add();
1230 } else {
1231 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1232 portBuilder.build(), null).add();
1233 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001234 // Store the new port
1235 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001236 // Create, store and apply the new nextObj and fwdObj
1237 ObjectiveContext context = new DefaultObjectiveContext(
1238 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1239 mcastIp, deviceId, port.toLong(), assignedVlan),
1240 (objective, error) -> {
1241 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1242 mcastIp, deviceId, port.toLong(), assignedVlan, error);
piere23cd862020-03-04 14:36:41 +01001243 // Schedule the removal using directly the key
1244 mcastWorker.execute(() -> mcastNextObjStore.remove(mcastStoreKey));
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001245 });
1246 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1247 newNextObj.id()).add(context);
1248 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1249 log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
1250 } else {
1251 srManager.flowObjectiveService.next(deviceId, newNextObj);
1252 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1253 }
Charles Chanc91c8782016-03-30 17:54:24 -07001254 } else {
1255 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001256 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001257 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001258 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001259 if (existingPorts.contains(port)) {
piereaddb182020-02-03 13:50:53 +01001260 log.debug("Port {}/{} already exists for {}. Abort", deviceId, port, mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -07001261 return;
1262 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001263 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001264 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001265 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001266 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001267 portBuilder.build(), nextObj.id()).addToExisting();
1268 // Store the final next objective and send only the difference to the driver
1269 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1270 // Add just the new port
1271 portBuilder = ImmutableSet.builder();
1272 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001273 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001274 portBuilder.build(), nextObj.id()).addToExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001275 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1276 log.debug("skip next flowobjective update for device: {}", deviceId);
1277 } else {
1278 // no need to update the flow here since we have updated the nextobjective/group
1279 // the existing flow will keep pointing to the updated nextobj
1280 srManager.flowObjectiveService.next(deviceId, newNextObj);
1281 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001282 }
Charles Chanc91c8782016-03-30 17:54:24 -07001283 }
1284
1285 /**
1286 * Removes a port from given multicast group on given device.
1287 * This involves the update of L3 multicast group and multicast routing
1288 * table entry.
1289 *
1290 * @param deviceId device ID
1291 * @param port port to be added
1292 * @param mcastIp multicast group
1293 * @param assignedVlan assigned VLAN ID
1294 * @return true if this is the last sink on this device
1295 */
Charles Chanba59dd62018-05-10 22:19:49 +00001296 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1297 IpAddress mcastIp, VlanId assignedVlan) {
pier9e02ab72020-02-12 20:40:55 +01001298 // TODO trace
1299 log.info("Removing {} on {}/{} and vlan {}", mcastIp, deviceId, port, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001300 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001301 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001302 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001303 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001304 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001305 }
Charles Chan72779502016-04-23 17:36:10 -07001306 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001307 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001308 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001309 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001310 if (!existingPorts.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001311 log.debug("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
Piere99511d2018-04-19 16:47:06 +02001312 return false;
1313 }
1314 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001315 }
1316 // Copy and modify the ImmutableSet
1317 existingPorts = Sets.newHashSet(existingPorts);
1318 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001319 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001320 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001321 ForwardingObjective fwdObj;
1322 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001323 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001324 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1325 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001326 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001327 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001328 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001329 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1330 log.debug("skip forward flowobjective removal for device: {}", deviceId);
1331 } else {
1332 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1333 }
Charles Chan72779502016-04-23 17:36:10 -07001334 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001335 } else {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001336 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001337 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001338 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan72779502016-04-23 17:36:10 -07001339 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001340 // Let's modify the next objective removing the bucket
1341 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001342 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001343 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1344 log.debug("skip next flowobjective update for device: {}", deviceId);
1345 } else {
1346 // no need to update the flow here since we have updated the next objective + group
1347 // the existing flow will keep pointing to the updated nextobj
1348 srManager.flowObjectiveService.next(deviceId, newNextObj);
1349 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001350 }
Charles Chanc91c8782016-03-30 17:54:24 -07001351 return existingPorts.isEmpty();
1352 }
1353
Charles Chan72779502016-04-23 17:36:10 -07001354 /**
1355 * Removes entire group on given device.
1356 *
1357 * @param deviceId device ID
1358 * @param mcastIp multicast group to be removed
1359 * @param assignedVlan assigned VLAN ID
1360 */
Charles Chanba59dd62018-05-10 22:19:49 +00001361 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1362 VlanId assignedVlan) {
pier9e02ab72020-02-12 20:40:55 +01001363 // TODO trace
1364 log.info("Removing {} on {} and vlan {}", mcastIp, deviceId, assignedVlan);
Piere99511d2018-04-19 16:47:06 +02001365 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001366 // This device is not serving this multicast group
1367 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
piereaddb182020-02-03 13:50:53 +01001368 log.debug("{} is not serving {}. Abort.", deviceId, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -07001369 return;
1370 }
1371 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001372 ObjectiveContext context = new DefaultObjectiveContext(
1373 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1374 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001375 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001376 mcastIp, deviceId, assignedVlan, error));
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001377 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1378 log.debug("skip flow changes on unconfigured device: {}", deviceId);
1379 } else {
1380 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
1381 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1382 }
Charles Chan72779502016-04-23 17:36:10 -07001383 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001384 }
1385
pier9e02ab72020-02-12 20:40:55 +01001386 private void installPath(IpAddress mcastIp, ConnectPoint source, List<Link> links) {
kezhiyong168fbba2018-12-03 16:14:29 +08001387 if (links.isEmpty()) {
1388 log.warn("There is no link that can be used. Stopping installation.");
1389 return;
1390 }
Pier1a7e0c02018-03-12 15:00:54 -07001391 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001392 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001393 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001394 // For each link, modify the next on the source device adding the src port
1395 // and a new filter objective on the destination port
1396 links.forEach(link -> {
1397 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
pier9e02ab72020-02-12 20:40:55 +01001398 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001399 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(link.dst(),
1400 mcastUtils.assignedVlan(null), mcastIp.isIp4());
1401 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001402 });
Pier1a7e0c02018-03-12 15:00:54 -07001403 // Setup mcast role for the transit
1404 links.stream()
1405 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001406 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001407 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001408 }
1409
Charles Chanc91c8782016-03-30 17:54:24 -07001410 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001411 * Gets a path from src to dst.
1412 * If a path was allocated before, returns the allocated path.
1413 * Otherwise, randomly pick one from available paths.
1414 *
1415 * @param src source device ID
1416 * @param dst destination device ID
1417 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001418 * @param allPaths paths list
pier9e02ab72020-02-12 20:40:55 +01001419 *
Charles Chanc91c8782016-03-30 17:54:24 -07001420 * @return an optional path from src to dst
1421 */
pier9e02ab72020-02-12 20:40:55 +01001422 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1423 IpAddress mcastIp, List<Path> allPaths) {
Pier1f87aca2018-03-14 16:47:32 -07001424 if (allPaths == null) {
pier9e02ab72020-02-12 20:40:55 +01001425 allPaths = mcastUtils.getPaths(src, dst, Collections.emptySet());
Pier1f87aca2018-03-14 16:47:32 -07001426 }
Charles Chanc91c8782016-03-30 17:54:24 -07001427 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001428 return Optional.empty();
1429 }
Piere99511d2018-04-19 16:47:06 +02001430 // Create a map index of suitability-to-list of paths. For example
pier9e02ab72020-02-12 20:40:55 +01001431 // a path in the list associated to the index 1 shares only one link
1432 // and it is less suitable of a path belonging to the index 2
Pier Luigi91573e12018-01-23 16:06:38 +01001433 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
pier9e02ab72020-02-12 20:40:55 +01001434 int score;
1435 // Let's build the multicast tree
1436 Set<List<Link>> storedPaths = getStoredPaths(mcastIp);
1437 Set<Link> storedTree = storedPaths.stream()
1438 .flatMap(Collection::stream).collect(Collectors.toSet());
1439 log.trace("Stored tree {}", storedTree);
1440 Set<Link> pathLinks;
Pier Luigi91573e12018-01-23 16:06:38 +01001441 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001442 if (!src.equals(path.links().get(0).src().deviceId())) {
1443 continue;
1444 }
pier9e02ab72020-02-12 20:40:55 +01001445 pathLinks = Sets.newHashSet(path.links());
1446 score = Sets.intersection(pathLinks, storedTree).size();
1447 // score defines the index
1448 if (score > 0) {
1449 eligiblePaths.compute(score, (index, paths) -> {
Pier Luigi91573e12018-01-23 16:06:38 +01001450 paths = paths == null ? Lists.newArrayList() : paths;
1451 paths.add(path);
1452 return paths;
1453 });
Charles Chanc91c8782016-03-30 17:54:24 -07001454 }
1455 }
Pier Luigi91573e12018-01-23 16:06:38 +01001456 if (eligiblePaths.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001457 log.trace("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001458 Collections.shuffle(allPaths);
1459 return allPaths.stream().findFirst();
1460 }
Pier Luigi91573e12018-01-23 16:06:38 +01001461 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001462 Integer bestIndex = eligiblePaths.keySet().stream()
1463 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001464 List<Path> bestPaths = eligiblePaths.get(bestIndex);
piereaddb182020-02-03 13:50:53 +01001465 log.trace("{} eligiblePath(s) found from {} to {}",
Pier Luigi91573e12018-01-23 16:06:38 +01001466 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001467 Collections.shuffle(bestPaths);
1468 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001469 }
1470
1471 /**
pier9e02ab72020-02-12 20:40:55 +01001472 * Gets stored paths of the group.
1473 *
1474 * @param mcastIp group address
1475 * @return a collection of paths
1476 */
1477 private Set<List<Link>> getStoredPaths(IpAddress mcastIp) {
1478 return mcastPathStore.stream()
1479 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1480 .map(Entry::getValue)
1481 .collect(Collectors.toSet());
1482 }
1483
1484 /**
Piere99511d2018-04-19 16:47:06 +02001485 * Gets device(s) of given role and of given source in given multicast tree.
1486 *
1487 * @param mcastIp multicast IP
1488 * @param role multicast role
1489 * @param source source connect point
1490 * @return set of device ID or empty set if not found
1491 */
1492 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1493 return mcastRoleStore.entrySet().stream()
1494 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001495 entry.getKey().source().equals(source) &&
1496 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001497 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1498 }
1499
1500 /**
Charles Chan72779502016-04-23 17:36:10 -07001501 * Gets device(s) of given role in given multicast group.
1502 *
1503 * @param mcastIp multicast IP
1504 * @param role multicast role
1505 * @return set of device ID or empty set if not found
1506 */
1507 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1508 return mcastRoleStore.entrySet().stream()
1509 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1510 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001511 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1512 }
1513
1514 /**
Piere99511d2018-04-19 16:47:06 +02001515 * Gets source(s) of given multicast group.
1516 *
1517 * @param mcastIp multicast IP
1518 * @return set of device ID or empty set if not found
1519 */
1520 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1521 return mcastRoleStore.entrySet().stream()
1522 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1523 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001524 }
1525
1526 /**
pier9e02ab72020-02-12 20:40:55 +01001527 * Gets sink(s) of given multicast group.
1528 *
1529 * @param mcastIp multicast IP
1530 * @return set of connect point or empty set if not found
1531 */
1532 private Set<ConnectPoint> getSinks(IpAddress mcastIp, DeviceId device, ConnectPoint source) {
1533 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
1534 Collection<? extends List<Link>> storedPaths = Versioned.valueOrElse(
1535 mcastPathStore.get(pathStoreKey), Lists.newArrayList());
1536 VlanId assignedVlan = mcastUtils.assignedVlan(device.equals(source.deviceId()) ? source : null);
1537 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, device, assignedVlan);
1538 NextObjective nextObjective = Versioned.valueOrNull(mcastNextObjStore.get(mcastStoreKey));
1539 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1540 if (nextObjective != null) {
1541 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
1542 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(device, portNumber)));
1543 }
1544 Set<ConnectPoint> egressCp = cpBuilder.build();
1545 return egressCp.stream()
1546 .filter(connectPoint -> !mcastUtils.isInfraPort(connectPoint, storedPaths))
1547 .collect(Collectors.toSet());
1548 }
1549
1550
1551
1552 /**
Charles Chan72779502016-04-23 17:36:10 -07001553 * Gets groups which is affected by the link down event.
1554 *
1555 * @param link link going down
1556 * @return a set of multicast IpAddress
1557 */
1558 private Set<IpAddress> getAffectedGroups(Link link) {
1559 DeviceId deviceId = link.src().deviceId();
1560 PortNumber port = link.src().port();
1561 return mcastNextObjStore.entrySet().stream()
1562 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001563 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1564 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001565 }
1566
1567 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001568 * Gets groups which are affected by the device down event.
1569 *
1570 * @param deviceId device going down
1571 * @return a set of multicast IpAddress
1572 */
1573 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1574 return mcastNextObjStore.entrySet().stream()
1575 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001576 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001577 .collect(Collectors.toSet());
1578 }
1579
1580 /**
Pier28164682018-04-17 15:50:43 +02001581 * Verify if a given connect point is sink for this group.
1582 *
1583 * @param mcastIp group address
1584 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001585 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001586 * @return true if the connect point is sink of the group
1587 */
Charles Chanba59dd62018-05-10 22:19:49 +00001588 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1589 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001590 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1591 source : null);
1592 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001593 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1594 return false;
1595 }
Pier28164682018-04-17 15:50:43 +02001596 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1597 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1598 }
1599
1600 /**
Piere99511d2018-04-19 16:47:06 +02001601 * Verify if a given connect point is sink for this group and for this source.
1602 *
1603 * @param mcastIp group address
1604 * @param connectPoint connect point to be verified
1605 * @param source source connect point
1606 * @return true if the connect point is sink of the group
1607 */
Charles Chanba59dd62018-05-10 22:19:49 +00001608 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1609 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001610 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1611 DeviceId device;
1612 if (connectPoint.deviceId().equals(source.deviceId())) {
1613 device = getDevice(mcastIp, INGRESS, source).stream()
1614 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1615 .findFirst().orElse(null);
1616 } else {
1617 device = getDevice(mcastIp, EGRESS, source).stream()
1618 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1619 .findFirst().orElse(null);
1620 }
1621 return isSink && device != null;
1622 }
1623
1624 /**
1625 * Verify if a sink is reachable from this source.
1626 *
1627 * @param mcastIp group address
1628 * @param sink connect point to be verified
1629 * @param source source connect point
1630 * @return true if the connect point is reachable from the source
1631 */
Charles Chanba59dd62018-05-10 22:19:49 +00001632 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1633 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001634 return sink.deviceId().equals(source.deviceId()) ||
pier9e02ab72020-02-12 20:40:55 +01001635 getPath(source.deviceId(), sink.deviceId(), mcastIp, null).isPresent();
Piere99511d2018-04-19 16:47:06 +02001636 }
1637
1638 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001639 * Updates filtering objective for given device and port.
1640 * It is called in general when the mcast config has been
1641 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001642 *
1643 * @param deviceId device ID
1644 * @param portNum ingress port number
1645 * @param vlanId assigned VLAN ID
1646 * @param install true to add, false to remove
1647 */
Charles Chanba59dd62018-05-10 22:19:49 +00001648 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1649 VlanId vlanId, boolean install) {
pier62e0b072019-12-23 19:21:49 +01001650 mcastWorker.execute(() -> updateFilterToDeviceInternal(deviceId, portNum, vlanId, install));
1651 }
1652
1653 private void updateFilterToDeviceInternal(DeviceId deviceId, PortNumber portNum,
1654 VlanId vlanId, boolean install) {
1655 lastMcastChange.set(Instant.now());
1656 // Iterates over the route and updates properly the filtering objective on the source device.
1657 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1658 log.debug("Update filter for {}", mcastRoute.group());
1659 if (!mcastUtils.isLeader(mcastRoute.group())) {
1660 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1661 return;
1662 }
1663 // Get the sources and for each one update properly the filtering objectives
1664 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1665 sources.forEach(source -> {
1666 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1667 if (install) {
1668 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
1669 vlanId, mcastRoute.group().isIp4());
1670 addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
1671 } else {
1672 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001673 }
pier62e0b072019-12-23 19:21:49 +01001674 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001675 });
pier62e0b072019-12-23 19:21:49 +01001676 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001677 }
1678
1679 /**
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001680 * Add filtering to the device if needed.
1681 *
1682 * @param filterObjStoreKey the filtering obj key
1683 * @param mcastIp the multicast group
1684 * @param mcastRole the multicast role
1685 */
1686 private void addFilterToDevice(McastFilteringObjStoreKey filterObjStoreKey,
1687 IpAddress mcastIp,
1688 McastRole mcastRole) {
1689 if (!containsFilterInTheDevice(filterObjStoreKey)) {
1690 // if this is the first sink for this group/device
1691 // match additionally on mac
1692 log.debug("Filtering not available for device {}, vlan {} and {}",
1693 filterObjStoreKey.ingressCP().deviceId(), filterObjStoreKey.vlanId(),
1694 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1695 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1696 filterObjStoreKey.ingressCP().port(),
1697 filterObjStoreKey.vlanId(), mcastIp,
1698 mcastRole, true);
1699 mcastFilteringObjStore.add(filterObjStoreKey);
1700 } else if (!mcastFilteringObjStore.contains(filterObjStoreKey)) {
1701 // match only vlan
1702 log.debug("Filtering not available for connect point {}, vlan {} and {}",
1703 filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
1704 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1705 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1706 filterObjStoreKey.ingressCP().port(),
1707 filterObjStoreKey.vlanId(), mcastIp,
1708 mcastRole, false);
1709 mcastFilteringObjStore.add(filterObjStoreKey);
1710 } else {
1711 // do nothing
pier9e02ab72020-02-12 20:40:55 +01001712 log.debug("Filtering already present for connect point {}, vlan {} and {}. Abort",
1713 filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
1714 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001715 }
1716 }
1717
1718 /**
1719 * Verify if there are related filtering obj in the device.
1720 *
1721 * @param filteringKey the filtering obj key
1722 * @return true if related filtering obj are found
1723 */
1724 private boolean containsFilterInTheDevice(McastFilteringObjStoreKey filteringKey) {
1725 // check if filters are already added on the device
1726 McastFilteringObjStoreKey key = mcastFilteringObjStore.stream()
1727 .filter(mcastFilteringKey ->
1728 mcastFilteringKey.ingressCP().deviceId().equals(filteringKey.ingressCP().deviceId())
1729 && mcastFilteringKey.isIpv4() == filteringKey.isIpv4()
1730 && mcastFilteringKey.vlanId().equals(filteringKey.vlanId())
1731 ).findFirst().orElse(null);
1732 // we are interested to filt obj on the same device, same vlan and same ip type
1733 return key != null;
1734 }
1735
1736 /**
1737 * Update the filtering objective store upon device failure.
1738 *
1739 * @param affectedDevice the affected device
1740 */
1741 private void updateFilterObjStoreByDevice(DeviceId affectedDevice) {
1742 // purge the related filter objective key
1743 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1744 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1745 McastFilteringObjStoreKey filterKey;
1746 while (filterIterator.hasNext()) {
1747 filterKey = filterIterator.next();
1748 if (filterKey.ingressCP().deviceId().equals(affectedDevice)) {
1749 mcastFilteringObjStore.remove(filterKey);
1750 }
1751 }
1752 }
1753
1754 /**
1755 * Update the filtering objective store upon port failure.
1756 *
1757 * @param affectedPort the affected port
1758 */
1759 private void updateFilterObjStoreByPort(ConnectPoint affectedPort) {
1760 // purge the related filter objective key
1761 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1762 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1763 McastFilteringObjStoreKey filterKey;
1764 while (filterIterator.hasNext()) {
1765 filterKey = filterIterator.next();
1766 if (filterKey.ingressCP().equals(affectedPort)) {
1767 mcastFilteringObjStore.remove(filterKey);
1768 }
1769 }
1770 }
1771
1772 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001773 * Performs bucket verification operation for all mcast groups in the devices.
1774 * Firstly, it verifies that mcast is stable before trying verification operation.
1775 * Verification consists in creating new nexts with VERIFY operation. Actually,
1776 * the operation is totally delegated to the driver.
1777 */
Piere99511d2018-04-19 16:47:06 +02001778 private final class McastBucketCorrector implements Runnable {
pierc32ef422020-01-27 17:45:03 +01001779 private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
1780 // Define the context used for the back pressure mechanism
1781 private final ObjectiveContext context = new DefaultObjectiveContext(
1782 (objective) -> {
1783 synchronized (verifyOnFlight) {
pier62e0b072019-12-23 19:21:49 +01001784 log.trace("Verify {} done", objective.id());
1785 verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
pierc32ef422020-01-27 17:45:03 +01001786 verifyOnFlight.notify();
1787 }
1788 },
1789 (objective, error) -> {
1790 synchronized (verifyOnFlight) {
pier62e0b072019-12-23 19:21:49 +01001791 log.trace("Verify {} error {}", objective.id(), error);
1792 verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
pierc32ef422020-01-27 17:45:03 +01001793 verifyOnFlight.notify();
1794 }
1795 });
1796
Pier Luigi35dab3f2018-01-25 16:16:02 +01001797 @Override
1798 public void run() {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001799 try {
1800 // Iterates over the routes and verify the related next objectives
pierc32ef422020-01-27 17:45:03 +01001801 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
pier62e0b072019-12-23 19:21:49 +01001802 if (!isMcastStable() || wasBktCorrRunning()) {
1803 return;
1804 }
pierc32ef422020-01-27 17:45:03 +01001805 IpAddress mcastIp = mcastRoute.group();
1806 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1807 // Verify leadership on the operation
1808 if (!mcastUtils.isLeader(mcastIp)) {
1809 log.trace("Skip {} due to lack of leadership", mcastIp);
1810 continue;
1811 }
1812 // Get sources and sinks from Mcast Route Service and warn about errors
1813 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1814 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1815 .flatMap(Collection::stream).collect(Collectors.toSet());
1816 // Do not proceed if sources of this group are missing
1817 if (sources.isEmpty()) {
1818 if (!sinks.isEmpty()) {
1819 log.warn("Unable to run buckets corrector. " +
1820 "Missing source {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +02001821 }
pierc32ef422020-01-27 17:45:03 +01001822 continue;
1823 }
1824 // For each group we get current information in the store
1825 // and issue a check of the next objectives in place
1826 Set<McastStoreKey> processedKeys = Sets.newHashSet();
1827 for (ConnectPoint source : sources) {
1828 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1829 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1830 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1831 // Do not proceed if ingress devices are missing
1832 if (ingressDevices.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001833 if (!sinks.isEmpty()) {
1834 log.warn("Unable to run buckets corrector. " +
pierc32ef422020-01-27 17:45:03 +01001835 "Missing ingress {} for source {} and for group {}",
1836 ingressDevices, source, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001837 }
pierc32ef422020-01-27 17:45:03 +01001838 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001839 }
pierc32ef422020-01-27 17:45:03 +01001840 // Create the set of the devices to be processed
1841 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
pier62e0b072019-12-23 19:21:49 +01001842 devicesBuilder.addAll(ingressDevices);
pierc32ef422020-01-27 17:45:03 +01001843 if (!transitDevices.isEmpty()) {
1844 devicesBuilder.addAll(transitDevices);
1845 }
1846 if (!egressDevices.isEmpty()) {
1847 devicesBuilder.addAll(egressDevices);
1848 }
1849 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1850 for (DeviceId deviceId : devicesToProcess) {
1851 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1852 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001853 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001854 }
pierc32ef422020-01-27 17:45:03 +01001855 synchronized (verifyOnFlight) {
1856 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
1857 verifyOnFlight.wait();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001858 }
pierc32ef422020-01-27 17:45:03 +01001859 }
1860 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1861 source : null);
1862 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1863 // Check if we already processed this next - trees merge at some point
1864 if (processedKeys.contains(currentKey)) {
1865 continue;
1866 }
1867 // Verify the nextobjective or skip to next device
1868 if (mcastNextObjStore.containsKey(currentKey)) {
1869 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1870 // Rebuild the next objective using assigned vlan
1871 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1872 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
1873 // Send to the flowobjective service
1874 srManager.flowObjectiveService.next(deviceId, currentNext);
1875 verifyOnFlight.incrementAndGet();
1876 log.trace("Verify on flight {}", verifyOnFlight);
1877 processedKeys.add(currentKey);
1878 } else {
1879 log.warn("Unable to run buckets corrector. " +
1880 "Missing next for {}, for source {} and for group {}",
1881 deviceId, source, mcastIp);
1882 }
1883 }
1884 }
pier62e0b072019-12-23 19:21:49 +01001885 // Let's wait the group before start the next one
1886 synchronized (verifyOnFlight) {
1887 while (verifyOnFlight.get() > 0) {
1888 verifyOnFlight.wait();
1889 }
1890 }
pierc32ef422020-01-27 17:45:03 +01001891 }
1892 } catch (InterruptedException e) {
1893 log.warn("BktCorr has been interrupted");
Pier Luigi35dab3f2018-01-25 16:16:02 +01001894 } finally {
pier62e0b072019-12-23 19:21:49 +01001895 lastBktCorrExecution.set(Instant.now());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001896 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001897 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001898 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001899
Piere99511d2018-04-19 16:47:06 +02001900 /**
1901 * Returns the associated next ids to the mcast groups or to the single
1902 * group if mcastIp is present.
1903 *
1904 * @param mcastIp the group ip
1905 * @return the mapping mcastIp-device to next id
1906 */
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001907 public Map<McastStoreKey, Integer> getNextIds(IpAddress mcastIp) {
pier9e02ab72020-02-12 20:40:55 +01001908 log.info("mcastNexts {}", mcastNextObjStore.size());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001909 if (mcastIp != null) {
1910 return mcastNextObjStore.entrySet().stream()
1911 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001912 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001913 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001914 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001915 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001916 }
1917
Pier71c55772018-04-17 17:25:22 +02001918 /**
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001919 * Removes given next ID from mcast next id store.
1920 *
1921 * @param nextId next id
1922 */
1923 public void removeNextId(int nextId) {
1924 mcastNextObjStore.entrySet().forEach(e -> {
1925 if (e.getValue().value().id() == nextId) {
1926 mcastNextObjStore.remove(e.getKey());
1927 }
1928 });
1929 }
1930
1931 /**
pier9e02ab72020-02-12 20:40:55 +01001932 * Build the mcast paths.
1933 *
1934 * @param storedPaths mcast tree
1935 * @param mcastIp the group ip
1936 * @param source the source
1937 */
1938 private Map<ConnectPoint, List<ConnectPoint>> buildMcastPaths(Collection<? extends List<Link>> storedPaths,
1939 IpAddress mcastIp, ConnectPoint source) {
1940 Map<ConnectPoint, List<ConnectPoint>> mcastTree = Maps.newHashMap();
1941 // Local sinks
1942 Set<ConnectPoint> localSinks = getSinks(mcastIp, source.deviceId(), source);
1943 localSinks.forEach(localSink -> mcastTree.put(localSink, Lists.newArrayList(localSink, source)));
1944 // Remote sinks
1945 storedPaths.forEach(path -> {
1946 List<Link> links = path;
1947 DeviceId egressDevice = links.get(links.size() - 1).dst().deviceId();
1948 Set<ConnectPoint> remoteSinks = getSinks(mcastIp, egressDevice, source);
1949 List<ConnectPoint> connectPoints = Lists.newArrayList(source);
1950 links.forEach(link -> {
1951 connectPoints.add(link.src());
1952 connectPoints.add(link.dst());
1953 });
1954 Collections.reverse(connectPoints);
1955 remoteSinks.forEach(remoteSink -> {
1956 List<ConnectPoint> finalPath = Lists.newArrayList(connectPoints);
1957 finalPath.add(0, remoteSink);
1958 mcastTree.put(remoteSink, finalPath);
1959 });
1960 });
1961 return mcastTree;
1962 }
1963
1964 /**
Piere99511d2018-04-19 16:47:06 +02001965 * Returns the associated roles to the mcast groups.
1966 *
1967 * @param mcastIp the group ip
1968 * @param sourcecp the source connect point
1969 * @return the mapping mcastIp-device to mcast role
1970 */
Charles Chanba59dd62018-05-10 22:19:49 +00001971 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1972 ConnectPoint sourcecp) {
pier9e02ab72020-02-12 20:40:55 +01001973 log.info("mcastRoles {}", mcastRoleStore.size());
Piere99511d2018-04-19 16:47:06 +02001974 if (mcastIp != null) {
1975 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1976 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1977 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1978 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1979 if (sourcecp != null) {
1980 roles = roles.entrySet().stream()
1981 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1982 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1983 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1984 }
1985 return roles;
1986 }
1987 return mcastRoleStore.entrySet().stream()
1988 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1989 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1990 }
1991
Pier71c55772018-04-17 17:25:22 +02001992 /**
1993 * Returns the associated trees to the mcast group.
1994 *
1995 * @param mcastIp the group ip
1996 * @param sourcecp the source connect point
1997 * @return the mapping egress point to mcast path
1998 */
Charles Chanba59dd62018-05-10 22:19:49 +00001999 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
2000 ConnectPoint sourcecp) {
pier9e02ab72020-02-12 20:40:55 +01002001 // TODO remove
2002 log.info("{}", getStoredPaths(mcastIp));
Pier71c55772018-04-17 17:25:22 +02002003 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02002004 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02002005 if (sourcecp != null) {
2006 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02002007 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02002008 }
Pier71c55772018-04-17 17:25:22 +02002009 if (!sources.isEmpty()) {
2010 sources.forEach(source -> {
pier9e02ab72020-02-12 20:40:55 +01002011 McastPathStoreKey pathStoreKey = new McastPathStoreKey(mcastIp, source);
2012 Collection<? extends List<Link>> storedPaths = Versioned.valueOrElse(
2013 mcastPathStore.get(pathStoreKey), Lists.newArrayList());
2014 // TODO remove
2015 log.info("Paths for group {} and source {} - {}", mcastIp, source, storedPaths.size());
2016 Map<ConnectPoint, List<ConnectPoint>> mcastTree = buildMcastPaths(storedPaths, mcastIp, source);
2017 mcastTree.forEach(mcastTrees::put);
Pier71c55772018-04-17 17:25:22 +02002018 });
2019 }
2020 return mcastTrees;
2021 }
2022
2023 /**
Pierdb27b8d2018-04-17 16:29:56 +02002024 * Return the leaders of the mcast groups.
2025 *
2026 * @param mcastIp the group ip
2027 * @return the mapping group-node
2028 */
2029 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
2030 return mcastUtils.getMcastLeaders(mcastIp);
2031 }
Harshada Chaundkar9204f312019-07-02 16:01:24 +00002032
2033 /**
2034 * Returns the mcast filtering obj.
2035 *
2036 * @return the mapping group-node
2037 */
2038 public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
pier9e02ab72020-02-12 20:40:55 +01002039 // TODO remove
2040 log.info("mcastFilters {}", mcastFilteringObjStore.size());
Harshada Chaundkar9204f312019-07-02 16:01:24 +00002041 Map<DeviceId, List<McastFilteringObjStoreKey>> mapping = Maps.newHashMap();
2042 Set<McastFilteringObjStoreKey> currentKeys = Sets.newHashSet(mcastFilteringObjStore);
2043 currentKeys.forEach(filteringObjStoreKey ->
2044 mapping.compute(filteringObjStoreKey.ingressCP().deviceId(), (k, v) -> {
2045 List<McastFilteringObjStoreKey> values = v;
2046 if (values == null) {
2047 values = Lists.newArrayList();
2048 }
2049 values.add(filteringObjStoreKey);
2050 return values;
2051 })
2052 );
2053 return mapping;
2054 }
Charles Chanc91c8782016-03-30 17:54:24 -07002055}