blob: ae6902df3ebabb1ec0c2f897b6c715ed8c3cdbb7 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigid29ca7c2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Charles Chanba59dd62018-05-10 22:19:49 +000041import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070042import org.onosproject.net.ConnectPoint;
43import org.onosproject.net.DeviceId;
44import org.onosproject.net.Link;
45import org.onosproject.net.Path;
46import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070051import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010052import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070054import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010055import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020059import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigi35dab3f2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
pierc32ef422020-01-27 17:45:03 +010074import java.util.concurrent.atomic.AtomicInteger;
Pier Luigi35dab3f2018-01-25 16:16:02 +010075import java.util.concurrent.locks.Lock;
76import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070077import java.util.stream.Collectors;
78
Pier Luigi35dab3f2018-01-25 16:16:02 +010079import static java.util.concurrent.Executors.newScheduledThreadPool;
80import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000081
Pierdb27b8d2018-04-17 16:29:56 +020082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070083import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020084import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
85import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000086import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
87import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
88
Pier979e61a2018-03-07 11:42:50 +010089import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
91import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070092
93/**
Pier Luigi69f774d2018-02-28 12:10:50 +010094 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070095 */
Charles Chan1eaf4802016-04-18 13:44:03 -070096public class McastHandler {
97 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070098 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070099 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +0200100 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700101 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +0200102 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Charles Chan72779502016-04-23 17:36:10 -0700103
Pier Luigid29ca7c2018-02-28 17:24:03 +0100104 // Wait time for the cache
105 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700106
Piere99511d2018-04-19 16:47:06 +0200107 //The mcastEventCache is implemented to avoid race condition by giving more time
108 // to the underlying subsystems to process previous calls.
Pier Luigid29ca7c2018-02-28 17:24:03 +0100109 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
110 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
111 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100112 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700113 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100114 McastEvent mcastEvent = notification.getValue();
115 RemovalCause cause = notification.getCause();
Piere99511d2018-04-19 16:47:06 +0200116 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigid29ca7c2018-02-28 17:24:03 +0100117 switch (notification.getCause()) {
118 case REPLACED:
119 case EXPIRED:
120 dequeueMcastEvent(mcastEvent);
121 break;
122 default:
123 break;
124 }
125 }).build();
126
127 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier1f87aca2018-03-14 16:47:32 -0700128 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700129 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
130 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700131 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700132 if (mcastEvent.type() == SOURCES_ADDED ||
133 mcastEvent.type() == SOURCES_REMOVED) {
Piere99511d2018-04-19 16:47:06 +0200134 // Current subject and prev just differ for the source connect points
135 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pier7b657162018-03-27 11:29:42 -0700136 } else if (mcastEvent.type() == SINKS_ADDED) {
Pier7b657162018-03-27 11:29:42 -0700137 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
138 // Get the previous locations and verify if there are changes
139 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
140 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
141 prevConnectPoints : Collections.emptySet());
142 if (!changes.isEmpty()) {
143 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700144 }
Pier7b657162018-03-27 11:29:42 -0700145 }));
146 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pier7b657162018-03-27 11:29:42 -0700147 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
148 // Get the current locations and verify if there are changes
149 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
150 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
151 currentConnectPoints : Collections.emptySet());
152 if (!changes.isEmpty()) {
153 sinksBuilder.add(hostId);
154 }
155 }));
156 } else if (mcastEvent.type() == ROUTE_REMOVED) {
157 // Current subject is null, just take the previous host ids
158 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100159 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100160 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700161 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100162 mcastEventCache.put(cacheKey, mcastEvent);
163 });
164 }
165
166 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700167 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
168 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier7b657162018-03-27 11:29:42 -0700169 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700170 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Piere99511d2018-04-19 16:47:06 +0200171 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
172 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
173 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
174 Set<ConnectPoint> sources;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100175 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700176 case SOURCES_ADDED:
Piere99511d2018-04-19 16:47:06 +0200177 sources = mcastUpdate.sources()
178 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
179 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
180 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100181 break;
Pier1f87aca2018-03-14 16:47:32 -0700182 case SOURCES_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200183 sources = mcastUpdate.sources()
184 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
185 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
186 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100187 break;
188 case ROUTE_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200189 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100190 break;
Pier1f87aca2018-03-14 16:47:32 -0700191 case SINKS_ADDED:
Piere99511d2018-04-19 16:47:06 +0200192 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100193 break;
Pier1f87aca2018-03-14 16:47:32 -0700194 case SINKS_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200195 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100196 break;
197 default:
198 break;
199 }
200 }
201
Pier Luigi35dab3f2018-01-25 16:16:02 +0100202 // Mcast lock to serialize local operations
203 private final Lock mcastLock = new ReentrantLock();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100204 private void mcastLock() {
205 mcastLock.lock();
206 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100207 private void mcastUnlock() {
208 mcastLock.unlock();
209 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100210 // Stability threshold for Mcast. Seconds
211 private static final long MCAST_STABLITY_THRESHOLD = 5;
212 // Last change done
213 private Instant lastMcastChange = Instant.now();
pierc32ef422020-01-27 17:45:03 +0100214 // Last bucker corrector execution
215 private Instant lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100216
217 /**
218 * Determines if mcast in the network has been stable in the last
219 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
220 * to the last mcast change timestamp.
221 *
222 * @return true if stable
223 */
224 private boolean isMcastStable() {
225 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
226 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
pierc32ef422020-01-27 17:45:03 +0100227 log.trace("Multicast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100228 return (now - last) > MCAST_STABLITY_THRESHOLD;
229 }
230
pierc32ef422020-01-27 17:45:03 +0100231 /**
232 * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
233 * by comparing the current time with the last corrector execution.
234 *
235 * @return true if stable
236 */
237 private boolean wasBktCorrRunning() {
238 long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
239 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
240 log.trace("McastBucketCorrector executed {}s ago", now - last);
241 return (now - last) < MCAST_VERIFY_INTERVAL;
242 }
243
Piere99511d2018-04-19 16:47:06 +0200244 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100245 private static final long MCAST_VERIFY_INTERVAL = 30;
Piere99511d2018-04-19 16:47:06 +0200246 // Executor for mcast bucket corrector and for cache
Pier Luigi35dab3f2018-01-25 16:16:02 +0100247 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100248 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100249
Charles Chan72779502016-04-23 17:36:10 -0700250 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700251 * Constructs the McastEventHandler.
252 *
253 * @param srManager Segment Routing manager
254 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700255 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700256 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700257 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700258 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700259 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700260 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200261 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700262 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700263 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700264 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700265 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700266 .build();
Piere99511d2018-04-19 16:47:06 +0200267 mcastKryo = new KryoNamespace.Builder()
268 .register(KryoNamespaces.API)
269 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
270 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700271 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200272 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700273 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700274 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700275 .build();
Pier7b657162018-03-27 11:29:42 -0700276 mcastUtils = new McastUtils(srManager, coreAppId, log);
Piere99511d2018-04-19 16:47:06 +0200277 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigi35dab3f2018-01-25 16:16:02 +0100278 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700279 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100280 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
281 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700282 }
283
284 /**
Piere99511d2018-04-19 16:47:06 +0200285 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700286 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100287 public void init() {
Pier7b657162018-03-27 11:29:42 -0700288 lastMcastChange = Instant.now();
289 mcastLock();
290 try {
291 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Piere99511d2018-04-19 16:47:06 +0200292 log.debug("Init group {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +0200293 if (!mcastUtils.isLeader(mcastRoute.group())) {
294 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
295 return;
296 }
Pier7b657162018-03-27 11:29:42 -0700297 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Piere99511d2018-04-19 16:47:06 +0200298 // For each source process the mcast tree
299 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
300 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
301 Set<DeviceId> visited = Sets.newHashSet();
302 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700303 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
Piere99511d2018-04-19 16:47:06 +0200304 currentPath, mcastRoute.group(), source);
305 // Get all the sinks and process them
306 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
307 mcastRouteData.sinks());
308 // Filter out all the working sinks, we do not want to move them
309 // TODO we need a better way to distinguish flows coming from different sources
310 sinks = sinks.stream()
311 .filter(sink -> !mcastPaths.containsKey(sink) ||
312 !isSinkForSource(mcastRoute.group(), sink, source))
313 .collect(Collectors.toSet());
314 if (sinks.isEmpty()) {
315 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
316 return;
317 }
piereaddb182020-02-03 13:50:53 +0100318 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
319 source.deviceId(), sinks);
Piere99511d2018-04-19 16:47:06 +0200320 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
321 mcastRoute.group(), paths));
322 });
Pier7b657162018-03-27 11:29:42 -0700323 });
324 } finally {
325 mcastUnlock();
326 }
Charles Chanc91c8782016-03-30 17:54:24 -0700327 }
328
329 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100330 * Clean up when deactivating the application.
331 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100332 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200333 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100334 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200335 mcastNextObjStore.destroy();
336 mcastRoleStore.destroy();
337 mcastUtils.terminate();
338 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100339 }
340
341 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100342 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200343 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700344 *
345 * @param event McastEvent with SOURCE_ADDED type
346 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100347 public void processMcastEvent(McastEvent event) {
Pierdb27b8d2018-04-17 16:29:56 +0200348 // If it is a route added, we do not enqueue
349 if (event.type() == ROUTE_ADDED) {
Pierdb27b8d2018-04-17 16:29:56 +0200350 processRouteAddedInternal(event.subject().route().group());
351 } else {
Pierdb27b8d2018-04-17 16:29:56 +0200352 enqueueMcastEvent(event);
353 }
Pier Luigi6786b922018-02-02 16:19:11 +0100354 }
355
356 /**
Piere99511d2018-04-19 16:47:06 +0200357 * Process the SOURCES_ADDED event.
358 *
359 * @param sources the sources connect point
360 * @param mcastIp the group address
361 * @param sinks the sinks connect points
362 */
363 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
364 Map<HostId, Set<ConnectPoint>> sinks) {
365 lastMcastChange = Instant.now();
366 mcastLock();
367 try {
piereaddb182020-02-03 13:50:53 +0100368 log.info("Processing sources added {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +0200369 if (!mcastUtils.isLeader(mcastIp)) {
370 log.debug("Skip {} due to lack of leadership", mcastIp);
371 return;
372 }
373 sources.forEach(source -> {
374 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
piereaddb182020-02-03 13:50:53 +0100375 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
376 sinksToBeAdded);
Piere99511d2018-04-19 16:47:06 +0200377 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
378 });
379 } finally {
380 mcastUnlock();
381 }
382 }
383
384 /**
385 * Process the SOURCES_REMOVED event.
386 *
387 * @param sourcesToBeRemoved the source connect points to be removed
388 * @param remainingSources the remainig source connect points
389 * @param mcastIp the group address
390 * @param sinks the sinks connect points
391 */
392 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
393 Set<ConnectPoint> remainingSources,
394 IpAddress mcastIp,
395 Map<HostId, Set<ConnectPoint>> sinks) {
396 lastMcastChange = Instant.now();
397 mcastLock();
398 try {
piereaddb182020-02-03 13:50:53 +0100399 log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
Piere99511d2018-04-19 16:47:06 +0200400 if (!mcastUtils.isLeader(mcastIp)) {
401 log.debug("Skip {} due to lack of leadership", mcastIp);
402 return;
403 }
404 if (remainingSources.isEmpty()) {
piereaddb182020-02-03 13:50:53 +0100405 log.debug("There are no more sources for {}", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200406 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
407 return;
408 }
409 // Skip offline devices
410 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
411 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
412 .collect(Collectors.toSet());
413 if (candidateSources.isEmpty()) {
414 log.debug("Skip {} due to empty sources to be removed", mcastIp);
415 return;
416 }
417 Set<Link> remainingLinks = Sets.newHashSet();
418 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
419 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
420 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
421 totalSources.addAll(remainingSources);
422 // Calculate all the links used by the sources
423 totalSources.forEach(source -> {
424 Set<ConnectPoint> currentSinks = sinks.values()
425 .stream().flatMap(Collection::stream)
426 .filter(sink -> isSinkForSource(mcastIp, sink, source))
427 .collect(Collectors.toSet());
428 candidateSinks.put(source, currentSinks);
429 currentSinks.forEach(currentSink -> {
430 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
431 mcastIp, null, source);
432 if (currentPath.isPresent()) {
433 if (!candidateSources.contains(source)) {
434 remainingLinks.addAll(currentPath.get().links());
435 } else {
436 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
437 }
438 }
439 });
440 });
441 // Clean transit links
442 candidateLinks.forEach((source, currentCandidateLinks) -> {
443 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
444 .immutableCopy();
445 if (!linksToBeRemoved.isEmpty()) {
446 currentCandidateLinks.forEach(link -> {
447 DeviceId srcLink = link.src().deviceId();
448 // Remove ports only on links to be removed
449 if (linksToBeRemoved.contains(link)) {
450 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
451 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
452 source : null));
453 }
454 // Remove role on the candidate links
455 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
456 });
457 }
458 });
459 // Clean ingress and egress
460 candidateSources.forEach(source -> {
461 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
462 currentSinks.forEach(currentSink -> {
463 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
464 source : null);
465 // Sinks co-located with the source
466 if (source.deviceId().equals(currentSink.deviceId())) {
467 if (source.port().equals(currentSink.port())) {
468 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
469 mcastIp, currentSink, source);
470 return;
471 }
472 // We need to check against the other sources and if it is
473 // necessary remove the port from the device - no overlap
474 Set<VlanId> otherVlans = remainingSources.stream()
475 // Only sources co-located and having this sink
476 .filter(remainingSource -> remainingSource.deviceId()
477 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
478 .contains(currentSink))
479 .map(remainingSource -> mcastUtils.assignedVlan(
480 remainingSource.deviceId().equals(currentSink.deviceId()) ?
481 remainingSource : null)).collect(Collectors.toSet());
482 if (!otherVlans.contains(assignedVlan)) {
483 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
484 mcastIp, assignedVlan);
485 }
486 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
487 source));
488 return;
489 }
490 Set<VlanId> otherVlans = remainingSources.stream()
491 .filter(remainingSource -> candidateSinks.get(remainingSource)
492 .contains(currentSink))
493 .map(remainingSource -> mcastUtils.assignedVlan(
494 remainingSource.deviceId().equals(currentSink.deviceId()) ?
495 remainingSource : null)).collect(Collectors.toSet());
496 // Sinks on other leaves
497 if (!otherVlans.contains(assignedVlan)) {
498 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
499 mcastIp, assignedVlan);
500 }
501 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
502 source));
503 });
504 });
505 } finally {
506 mcastUnlock();
507 }
508 }
509
510 /**
Pierdb27b8d2018-04-17 16:29:56 +0200511 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100512 *
Pierdb27b8d2018-04-17 16:29:56 +0200513 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100514 */
Pierdb27b8d2018-04-17 16:29:56 +0200515 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100516 lastMcastChange = Instant.now();
517 mcastLock();
518 try {
piereaddb182020-02-03 13:50:53 +0100519 log.info("Processing route added for Multicast group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200520 // Just elect a new leader
521 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100522 } finally {
523 mcastUnlock();
524 }
525 }
526
527 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100528 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200529 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100530 * @param mcastIp multicast group IP address
531 */
Piere99511d2018-04-19 16:47:06 +0200532 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi6786b922018-02-02 16:19:11 +0100533 lastMcastChange = Instant.now();
534 mcastLock();
535 try {
piereaddb182020-02-03 13:50:53 +0100536 log.info("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200537 if (!mcastUtils.isLeader(mcastIp)) {
538 log.debug("Skip {} due to lack of leadership", mcastIp);
539 mcastUtils.withdrawLeader(mcastIp);
540 return;
541 }
Piere99511d2018-04-19 16:47:06 +0200542 sources.forEach(source -> {
543 // Find out the ingress, transit and egress device of the affected group
544 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
545 .stream().findFirst().orElse(null);
546 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
547 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
548 // If there are no egress and transit devices, sinks could be only on the ingress
549 if (!egressDevices.isEmpty()) {
550 egressDevices.forEach(deviceId -> {
551 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
552 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
553 });
554 }
555 if (!transitDevices.isEmpty()) {
556 transitDevices.forEach(deviceId -> {
557 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
558 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
559 });
560 }
561 if (ingressDevice != null) {
562 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
563 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
564 }
565 });
566 // Finally, withdraw the leadership
567 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100568 } finally {
569 mcastUnlock();
570 }
571 }
572
Pier7b657162018-03-27 11:29:42 -0700573 /**
574 * Process sinks to be removed.
575 *
Piere99511d2018-04-19 16:47:06 +0200576 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700577 * @param mcastIp the ip address of the group
578 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200579 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700580 */
Piere99511d2018-04-19 16:47:06 +0200581 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700582 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200583 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700584 lastMcastChange = Instant.now();
585 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700586 try {
piereaddb182020-02-03 13:50:53 +0100587 log.info("Processing sinks removed for group {} and for sources {}",
588 mcastIp, sources);
Pierdb27b8d2018-04-17 16:29:56 +0200589 if (!mcastUtils.isLeader(mcastIp)) {
590 log.debug("Skip {} due to lack of leadership", mcastIp);
591 return;
592 }
Piere99511d2018-04-19 16:47:06 +0200593 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
594 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
595 sources.forEach(source -> {
596 // Save the path associated to the sinks to be removed
597 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
598 newSinks, source);
599 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
600 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
601 sink.deviceId(), mcastIp,
602 null, source)));
603 treesToBeRemoved.put(source, treeToBeRemoved);
604 // Recover the dual-homed sinks
605 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
606 prevSinks, source);
607 treesToBeAdded.put(source, sinksToBeRecovered);
608 });
609 // Remove the sinks taking into account the multiple sources and the original paths
610 treesToBeRemoved.forEach((source, tree) ->
611 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
612 // Add new sinks according to the recovery procedure
613 treesToBeAdded.forEach((source, sinks) ->
614 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700615 } finally {
616 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700617 }
618 }
619
Pier Luigi6786b922018-02-02 16:19:11 +0100620 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100621 * Removes a path from source to sink for given multicast group.
622 *
623 * @param source connect point of the multicast source
624 * @param sink connection point of the multicast sink
625 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200626 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100627 */
628 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200629 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100630 lastMcastChange = Instant.now();
631 mcastLock();
632 try {
piereaddb182020-02-03 13:50:53 +0100633 log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700634 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100635 // When source and sink are on the same device
636 if (source.deviceId().equals(sink.deviceId())) {
637 // Source and sink are on even the same port. There must be something wrong.
638 if (source.port().equals(sink.port())) {
Piere99511d2018-04-19 16:47:06 +0200639 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100640 return;
641 }
Pier7b657162018-03-27 11:29:42 -0700642 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100643 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200644 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100645 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100646 return;
647 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100648 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700649 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100650 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200651 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100652 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100653 // If this is the last sink on the device, also update upstream
Pier Luigi35dab3f2018-01-25 16:16:02 +0100654 if (mcastPath.isPresent()) {
655 List<Link> links = Lists.newArrayList(mcastPath.get().links());
656 Collections.reverse(links);
657 for (Link link : links) {
658 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200659 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
660 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi92e69be2018-03-02 12:53:37 +0100661 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200662 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100663 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100664 }
Charles Chanc91c8782016-03-30 17:54:24 -0700665 }
666 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100667 } finally {
668 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700669 }
670 }
671
Pier7b657162018-03-27 11:29:42 -0700672
673 /**
674 * Process sinks to be added.
675 *
Piere99511d2018-04-19 16:47:06 +0200676 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700677 * @param mcastIp the group IP
678 * @param newSinks the new sinks to be processed
679 * @param allPrevSinks all previous sinks
680 */
Piere99511d2018-04-19 16:47:06 +0200681 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700682 Map<HostId, Set<ConnectPoint>> newSinks,
683 Set<ConnectPoint> allPrevSinks) {
684 lastMcastChange = Instant.now();
685 mcastLock();
686 try {
piereaddb182020-02-03 13:50:53 +0100687 log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
Pierdb27b8d2018-04-17 16:29:56 +0200688 if (!mcastUtils.isLeader(mcastIp)) {
689 log.debug("Skip {} due to lack of leadership", mcastIp);
690 return;
691 }
Piere99511d2018-04-19 16:47:06 +0200692 sources.forEach(source -> {
693 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
694 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
695 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
696 });
Pier7b657162018-03-27 11:29:42 -0700697 } finally {
698 mcastUnlock();
699 }
700 }
701
Charles Chanc91c8782016-03-30 17:54:24 -0700702 /**
703 * Establishes a path from source to sink for given multicast group.
704 *
705 * @param source connect point of the multicast source
706 * @param sink connection point of the multicast sink
707 * @param mcastIp multicast group IP address
708 */
709 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700710 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100711 lastMcastChange = Instant.now();
712 mcastLock();
713 try {
piereaddb182020-02-03 13:50:53 +0100714 log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100715 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700716 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
Piere99511d2018-04-19 16:47:06 +0200717 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100718 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100719 if (source.port().equals(sink.port())) {
720 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
721 mcastIp, sink, source);
722 return;
723 }
Pier7b657162018-03-27 11:29:42 -0700724 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere99511d2018-04-19 16:47:06 +0200725 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100726 return;
727 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100728 // Find a path. If present, create/update groups and flows for each hop
Piere99511d2018-04-19 16:47:06 +0200729 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100730 if (mcastPath.isPresent()) {
731 List<Link> links = mcastPath.get().links();
Pier1a7e0c02018-03-12 15:00:54 -0700732 // Setup mcast role for ingress
Piere99511d2018-04-19 16:47:06 +0200733 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
734 // Setup properly the transit forwarding
Pier Luigi35dab3f2018-01-25 16:16:02 +0100735 links.forEach(link -> {
736 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700737 mcastUtils.assignedVlan(link.src().deviceId()
738 .equals(source.deviceId()) ? source : null));
739 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
740 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100741 });
Pier1a7e0c02018-03-12 15:00:54 -0700742 // Setup mcast role for the transit
743 links.stream()
744 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Piere99511d2018-04-19 16:47:06 +0200745 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
746 source), TRANSIT));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100747 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700748 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700749 // Setup mcast role for egress
Piere99511d2018-04-19 16:47:06 +0200750 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100751 } else {
Piere99511d2018-04-19 16:47:06 +0200752 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100753 }
754 } finally {
755 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700756 }
757 }
758
759 /**
Charles Chan72779502016-04-23 17:36:10 -0700760 * Processes the LINK_DOWN event.
761 *
piereaddb182020-02-03 13:50:53 +0100762 * @param linkDown Link that is going down
Charles Chan72779502016-04-23 17:36:10 -0700763 */
piereaddb182020-02-03 13:50:53 +0100764 public void processLinkDown(Link linkDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100765 lastMcastChange = Instant.now();
766 mcastLock();
767 try {
768 // Get groups affected by the link down event
piereaddb182020-02-03 13:50:53 +0100769 Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
770 log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
771 affectedGroups.forEach(mcastIp -> {
772 log.debug("Processing link down {} for group {}", linkDown, mcastIp);
773 recoverFailure(mcastIp, linkDown);
Charles Chan72779502016-04-23 17:36:10 -0700774 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100775 } finally {
776 mcastUnlock();
777 }
Charles Chan72779502016-04-23 17:36:10 -0700778 }
779
780 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100781 * Process the DEVICE_DOWN event.
782 *
783 * @param deviceDown device going down
784 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100785 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100786 lastMcastChange = Instant.now();
787 mcastLock();
788 try {
789 // Get the mcast groups affected by the device going down
piereaddb182020-02-03 13:50:53 +0100790 Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
791 log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
792 affectedGroups.forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200793 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
794 recoverFailure(mcastIp, deviceDown);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100795 });
796 } finally {
797 mcastUnlock();
798 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100799 }
800
801 /**
Piere99511d2018-04-19 16:47:06 +0200802 * General failure recovery procedure.
803 *
804 * @param mcastIp the group to recover
805 * @param failedElement the failed element
806 */
807 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
808 // TODO Optimize when the group editing is in place
809 if (!mcastUtils.isLeader(mcastIp)) {
810 log.debug("Skip {} due to lack of leadership", mcastIp);
811 return;
812 }
813 // Do not proceed if the sources of this group are missing
814 Set<ConnectPoint> sources = getSources(mcastIp);
815 if (sources.isEmpty()) {
816 log.warn("Missing sources for group {}", mcastIp);
817 return;
818 }
819 // Find out the ingress devices of the affected group
820 // If sinks are in other leafs, we have ingress, transit, egress, and source
821 // If sinks are in the same leaf, we have just ingress and source
822 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
823 if (ingressDevices.isEmpty()) {
piereaddb182020-02-03 13:50:53 +0100824 log.warn("Missing ingress devices for group {}", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200825 return;
826 }
827 // For each tree, delete ingress-transit part
828 sources.forEach(source -> {
829 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
830 transitDevices.forEach(transitDevice -> {
831 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
832 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
833 });
834 });
835 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
836 // TODO Evaluate the possibility of building optimize trees between sources
837 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
838 sources.forEach(source -> {
839 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
840 DeviceId ingressDevice = ingressDevices.stream()
841 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
842 // Clean also the ingress
843 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
844 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
845 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
846 }
847 if (ingressDevice == null) {
848 log.warn("Skip failure recovery - " +
849 "Missing ingress for source {} and group {}", source, mcastIp);
850 return;
851 }
852 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
piereaddb182020-02-03 13:50:53 +0100853 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, egressDevices);
Piere99511d2018-04-19 16:47:06 +0200854 // We have to verify, if there are egresses without paths
855 mcastTree.forEach((egressDevice, paths) -> {
856 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
857 mcastIp, paths, source);
858 // No paths, we have to try with alternative location
859 if (!mcastPath.isPresent()) {
860 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
861 listSources = listSources == null ? Sets.newHashSet() : listSources;
862 listSources.add(source);
863 return listSources;
864 });
865 notRecoveredInternal.add(egressDevice);
866 }
867 });
868 // Fast path, we can recover all the locations
869 if (notRecoveredInternal.isEmpty()) {
870 mcastTree.forEach((egressDevice, paths) -> {
Charles Chanba59dd62018-05-10 22:19:49 +0000871 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
872 mcastIp, paths, source);
873 if (mcastPath.isPresent()) {
874 installPath(mcastIp, source, mcastPath.get());
875 }
Piere99511d2018-04-19 16:47:06 +0200876 });
877 } else {
878 // Let's try to recover using alternative locations
879 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
880 ingressDevice, source);
881 }
882 });
883 // Finally remove the egresses not recovered
884 notRecovered.forEach((egressDevice, listSources) -> {
885 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
886 if (Objects.equal(currentSources, listSources)) {
887 log.warn("Fail to recover egress device {} from {} failure {}",
888 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
889 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
890 }
891 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
892 });
893 }
894
895 /**
Pier7b657162018-03-27 11:29:42 -0700896 * Try to recover sinks using alternate locations.
897 *
898 * @param egressDevices the original egress devices
899 * @param notRecovered the devices not recovered
900 * @param mcastIp the group address
901 * @param ingressDevice the ingress device
902 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700903 */
904 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200905 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
906 log.debug("Processing recover sinks for group {} and for source {}",
907 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700908 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700909 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700910 Set<ConnectPoint> totalSinks = Sets.newHashSet();
911 // Let's compute all the affected sinks and all the sinks
912 notRecovered.forEach(deviceId -> {
913 totalAffectedSinks.addAll(
Charles Chanba59dd62018-05-10 22:19:49 +0000914 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
915 .flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700916 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Charles Chanba59dd62018-05-10 22:19:49 +0000917 .collect(Collectors.toSet())
918 );
Pier7b657162018-03-27 11:29:42 -0700919 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200920 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Charles Chanba59dd62018-05-10 22:19:49 +0000921 .flatMap(Collection::stream).collect(Collectors.toSet())
922 );
Pier7b657162018-03-27 11:29:42 -0700923 });
Pier7b657162018-03-27 11:29:42 -0700924 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200925 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
926 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
927 newEgressDevices.addAll(recovered);
928 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
929 newEgressDevices = newEgressDevices.stream()
930 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +0100931 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700932 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200933 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700934 sinksToBeAdded.stream()
935 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
936 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
937 }
Pier7b657162018-03-27 11:29:42 -0700938 // Construct a new path for each egress device
939 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200940 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700941 if (mcastPath.isPresent()) {
942 // Using recovery procedure
943 if (recovered.contains(egressDevice)) {
944 installPath(mcastIp, source, mcastPath.get());
945 } else {
946 // otherwise we need to threat as new sink
947 sinksToBeAdded.stream()
948 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
949 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
950 }
Pier7b657162018-03-27 11:29:42 -0700951 }
952 });
Pier7b657162018-03-27 11:29:42 -0700953 }
954
955 /**
Pier28164682018-04-17 15:50:43 +0200956 * Process all the sinks related to a mcast group and return
957 * the ones to be removed.
958 *
959 * @param mcastIp the group address
960 * @param prevsinks the previous sinks to be evaluated
961 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200962 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200963 * @return the set of the sinks to be removed
964 */
965 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
966 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +0200967 Map<HostId, Set<ConnectPoint>> newSinks,
968 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +0200969 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +0100970 log.debug("Processing sinks to be removed for Multicast group {}, source {}",
971 mcastIp, source);
Pier28164682018-04-17 15:50:43 +0200972 prevsinks.forEach(((hostId, connectPoints) -> {
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +0000973 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +0000974 //in this case connect points are single homed sinks.
975 //just found the difference btw previous and new sinks for this source.
976 Set<ConnectPoint> difference = Sets.difference(connectPoints, newSinks.get(hostId));
977 sinksToBeProcessed.addAll(difference);
978 return;
979 }
Pier28164682018-04-17 15:50:43 +0200980 // We have to check with the existing flows
981 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +0200982 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +0200983 .findFirst().orElse(null);
984 if (sinkToBeProcessed != null) {
985 // If the host has been removed or location has been removed
986 if (!newSinks.containsKey(hostId) ||
987 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
988 sinksToBeProcessed.add(sinkToBeProcessed);
989 }
990 }
991 }));
992 // We have done, return the set
993 return sinksToBeProcessed;
994 }
995
996 /**
Pier7b657162018-03-27 11:29:42 -0700997 * Process new locations and return the set of sinks to be added
998 * in the context of the recovery.
999 *
Pier28164682018-04-17 15:50:43 +02001000 * @param newSinks the remaining sinks
1001 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +02001002 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -07001003 * @return the set of the sinks to be processed
1004 */
Charles Chanba59dd62018-05-10 22:19:49 +00001005 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
1006 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +02001007 Map<HostId, Set<ConnectPoint>> prevSinks,
1008 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -07001009 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001010 log.debug("Processing sinks to be recovered for Multicast group {}, source {}",
1011 mcastIp, source);
Pier28164682018-04-17 15:50:43 +02001012 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -07001013 // If it has more than 1 locations
1014 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
1015 log.debug("Skip {} since sink {} has {} locations",
1016 mcastIp, hostId, connectPoints.size());
1017 return;
1018 }
Pier28164682018-04-17 15:50:43 +02001019 // If previously it had two locations, we need to recover it
1020 // Filter out if the remaining location is already served
1021 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +02001022 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001023 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +02001024 .findFirst().orElse(null);
1025 if (sinkToBeProcessed != null) {
1026 sinksToBeProcessed.add(sinkToBeProcessed);
1027 }
Pier28164682018-04-17 15:50:43 +02001028 }
Pier7b657162018-03-27 11:29:42 -07001029 });
1030 return sinksToBeProcessed;
1031 }
1032
1033 /**
1034 * Process all the sinks related to a mcast group and return
1035 * the ones to be processed.
1036 *
1037 * @param source the source connect point
1038 * @param mcastIp the group address
1039 * @param sinks the sinks to be evaluated
1040 * @return the set of the sinks to be processed
1041 */
1042 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1043 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001044 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001045 log.debug("Processing sinks to be added for Multicast group {}, source {}",
1046 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -07001047 sinks.forEach(((hostId, connectPoints) -> {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001048 //add all connect points that are not tied with any host
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001049 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001050 sinksToBeProcessed.addAll(connectPoints);
1051 return;
1052 }
Pier7b657162018-03-27 11:29:42 -07001053 // If it has more than 2 locations
1054 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1055 log.debug("Skip {} since sink {} has {} locations",
1056 mcastIp, hostId, connectPoints.size());
1057 return;
1058 }
1059 // If it has one location, just use it
1060 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001061 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001062 return;
1063 }
1064 // We prefer to reuse existing flows
1065 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001066 .filter(connectPoint -> {
1067 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1068 return false;
1069 }
1070 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1071 return false;
1072 }
1073 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001074 .filter(remaining -> !remaining.equals(connectPoint))
1075 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001076 // We are already serving the sink
1077 return !isSinkForSource(mcastIp, other, source);
1078 }).findFirst().orElse(null);
1079
Pier7b657162018-03-27 11:29:42 -07001080 if (sinkToBeProcessed != null) {
1081 sinksToBeProcessed.add(sinkToBeProcessed);
1082 return;
1083 }
1084 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001085 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001086 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001087 .filter(connectPoint -> {
1088 if (!egresses.contains(connectPoint.deviceId())) {
1089 return false;
1090 }
1091 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1092 return false;
1093 }
1094 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001095 .filter(remaining -> !remaining.equals(connectPoint))
1096 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001097 return !isSinkForSource(mcastIp, other, source);
1098 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001099 if (sinkToBeProcessed != null) {
1100 sinksToBeProcessed.add(sinkToBeProcessed);
1101 return;
1102 }
1103 // Otherwise we prefer a location co-located with the source (if it exists)
1104 sinkToBeProcessed = connectPoints.stream()
1105 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1106 .findFirst().orElse(null);
1107 if (sinkToBeProcessed != null) {
1108 sinksToBeProcessed.add(sinkToBeProcessed);
1109 return;
1110 }
Piere99511d2018-04-19 16:47:06 +02001111 // Finally, we randomly pick a new location if it is reachable
1112 sinkToBeProcessed = connectPoints.stream()
1113 .filter(connectPoint -> {
1114 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1115 return false;
1116 }
1117 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001118 .filter(remaining -> !remaining.equals(connectPoint))
1119 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001120 return !isSinkForSource(mcastIp, other, source);
1121 }).findFirst().orElse(null);
1122 if (sinkToBeProcessed != null) {
1123 sinksToBeProcessed.add(sinkToBeProcessed);
1124 }
Pier7b657162018-03-27 11:29:42 -07001125 }));
Pier7b657162018-03-27 11:29:42 -07001126 return sinksToBeProcessed;
1127 }
1128
1129 /**
Pier1a7e0c02018-03-12 15:00:54 -07001130 * Utility method to remove all the ingress transit ports.
1131 *
1132 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001133 * @param ingressDevices the ingress devices
1134 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001135 */
Piere99511d2018-04-19 16:47:06 +02001136 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1137 Set<ConnectPoint> sources) {
1138 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1139 sources.forEach(source -> {
1140 DeviceId ingressDevice = ingressDevices.stream()
1141 .filter(deviceId -> deviceId.equals(source.deviceId()))
1142 .findFirst().orElse(null);
1143 if (ingressDevice == null) {
1144 log.warn("Skip removeIngressTransitPorts - " +
1145 "Missing ingress for source {} and group {}",
1146 source, mcastIp);
1147 return;
Pier1a7e0c02018-03-12 15:00:54 -07001148 }
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001149 Set<PortNumber> ingressTransitPort = ingressTransitPort(mcastIp, ingressDevice, source);
1150 if (ingressTransitPort.isEmpty()) {
1151 log.warn("No transit ports to remove on device {}", ingressDevice);
1152 return;
1153 }
1154 ingressTransitPorts.put(source, ingressTransitPort);
Pier1a7e0c02018-03-12 15:00:54 -07001155 });
Piere99511d2018-04-19 16:47:06 +02001156 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1157 DeviceId ingressDevice = ingressDevices.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001158 .filter(deviceId -> deviceId.equals(source.deviceId()))
1159 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001160 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1161 mcastIp, mcastUtils.assignedVlan(source));
1162 if (isLast) {
1163 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1164 }
1165 }));
Pier1a7e0c02018-03-12 15:00:54 -07001166 }
1167
1168 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001169 * Adds a port to given multicast group on given device. This involves the
1170 * update of L3 multicast group and multicast routing table entry.
1171 *
1172 * @param deviceId device ID
1173 * @param port port to be added
1174 * @param mcastIp multicast group
1175 * @param assignedVlan assigned VLAN ID
1176 */
Charles Chanba59dd62018-05-10 22:19:49 +00001177 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1178 IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001179 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001180 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001181 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001182 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001183 // First time someone request this mcast group via this device
1184 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001185 // New nextObj
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001186 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1187 log.debug("Passing 0 as nextId for unconfigured device {}", deviceId);
1188 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1189 portBuilder.build(), 0).add();
1190 } else {
1191 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1192 portBuilder.build(), null).add();
1193 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001194 // Store the new port
1195 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001196 } else {
1197 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001198 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001199 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001200 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001201 if (existingPorts.contains(port)) {
piereaddb182020-02-03 13:50:53 +01001202 log.debug("Port {}/{} already exists for {}. Abort", deviceId, port, mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -07001203 return;
1204 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001205 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001206 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001207 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001208 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001209 portBuilder.build(), nextObj.id()).addToExisting();
1210 // Store the final next objective and send only the difference to the driver
1211 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1212 // Add just the new port
1213 portBuilder = ImmutableSet.builder();
1214 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001215 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001216 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001217 }
1218 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001219 ObjectiveContext context = new DefaultObjectiveContext(
1220 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1221 mcastIp, deviceId, port.toLong(), assignedVlan),
Charles Chanfacfbef2018-08-23 14:30:33 -07001222 (objective, error) -> {
1223 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1224 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1225 srManager.invalidateNextObj(objective.id());
1226 });
Pier7b657162018-03-27 11:29:42 -07001227 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1228 newNextObj.id()).add(context);
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001229 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1230 log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
1231 } else {
1232 srManager.flowObjectiveService.next(deviceId, newNextObj);
1233 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1234 }
Charles Chanc91c8782016-03-30 17:54:24 -07001235 }
1236
1237 /**
1238 * Removes a port from given multicast group on given device.
1239 * This involves the update of L3 multicast group and multicast routing
1240 * table entry.
1241 *
1242 * @param deviceId device ID
1243 * @param port port to be added
1244 * @param mcastIp multicast group
1245 * @param assignedVlan assigned VLAN ID
1246 * @return true if this is the last sink on this device
1247 */
Charles Chanba59dd62018-05-10 22:19:49 +00001248 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1249 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001250 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001251 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001252 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001253 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001254 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001255 }
Charles Chan72779502016-04-23 17:36:10 -07001256 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001257 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001258 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001259 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001260 if (!existingPorts.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001261 log.debug("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
Piere99511d2018-04-19 16:47:06 +02001262 return false;
1263 }
1264 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001265 }
1266 // Copy and modify the ImmutableSet
1267 existingPorts = Sets.newHashSet(existingPorts);
1268 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001269 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001270 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001271 ForwardingObjective fwdObj;
1272 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001273 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001274 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1275 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001276 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001277 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001278 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001279 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001280 } else {
1281 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001282 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001283 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1284 mcastIp, deviceId, port.toLong(), assignedVlan),
Charles Chanfacfbef2018-08-23 14:30:33 -07001285 (objective, error) -> {
1286 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1287 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1288 srManager.invalidateNextObj(objective.id());
1289 });
Pier Luigi8cd46de2018-01-19 10:24:53 +01001290 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001291 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001292 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001293 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001294 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001295 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001296 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001297 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001298 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001299 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1300 log.debug("skip forward and next flow objectives from adding flows on device: {}", deviceId);
1301 } else {
1302 srManager.flowObjectiveService.next(deviceId, newNextObj);
1303 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1304 }
Charles Chanc91c8782016-03-30 17:54:24 -07001305 return existingPorts.isEmpty();
1306 }
1307
Charles Chan72779502016-04-23 17:36:10 -07001308 /**
1309 * Removes entire group on given device.
1310 *
1311 * @param deviceId device ID
1312 * @param mcastIp multicast group to be removed
1313 * @param assignedVlan assigned VLAN ID
1314 */
Charles Chanba59dd62018-05-10 22:19:49 +00001315 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1316 VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001317 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001318 // This device is not serving this multicast group
1319 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
piereaddb182020-02-03 13:50:53 +01001320 log.debug("{} is not serving {}. Abort.", deviceId, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -07001321 return;
1322 }
1323 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001324 ObjectiveContext context = new DefaultObjectiveContext(
1325 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1326 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001327 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001328 mcastIp, deviceId, assignedVlan, error));
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001329 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1330 log.debug("skip flow changes on unconfigured device: {}", deviceId);
1331 } else {
1332 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
1333 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1334 }
Charles Chan72779502016-04-23 17:36:10 -07001335 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001336 }
1337
Pier Luigi580fd8a2018-01-16 10:47:50 +01001338 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001339 List<Link> links = mcastPath.links();
kezhiyong168fbba2018-12-03 16:14:29 +08001340 if (links.isEmpty()) {
1341 log.warn("There is no link that can be used. Stopping installation.");
1342 return;
1343 }
Pier1a7e0c02018-03-12 15:00:54 -07001344 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001345 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001346 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001347 // For each link, modify the next on the source device adding the src port
1348 // and a new filter objective on the destination port
1349 links.forEach(link -> {
1350 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001351 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1352 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1353 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001354 });
Pier1a7e0c02018-03-12 15:00:54 -07001355 // Setup mcast role for the transit
1356 links.stream()
1357 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001358 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001359 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001360 }
1361
Charles Chanc91c8782016-03-30 17:54:24 -07001362 /**
Pier1f87aca2018-03-14 16:47:32 -07001363 * Go through all the paths, looking for shared links to be used
1364 * in the final path computation.
1365 *
1366 * @param egresses egress devices
1367 * @param availablePaths all the available paths towards the egress
1368 * @return shared links between egress devices
1369 */
Charles Chanba59dd62018-05-10 22:19:49 +00001370 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1371 Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001372 int minLength = Integer.MAX_VALUE;
1373 int length;
Pier1f87aca2018-03-14 16:47:32 -07001374 List<Path> currentPaths;
1375 // Verify the source can still reach all the egresses
1376 for (DeviceId egress : egresses) {
1377 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001378 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001379 currentPaths = availablePaths.get(egress);
1380 if (currentPaths.isEmpty()) {
1381 continue;
1382 }
Piere99511d2018-04-19 16:47:06 +02001383 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001384 length = currentPaths.get(0).links().size();
1385 if (length < minLength) {
1386 minLength = length;
1387 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001388 }
Pier1f87aca2018-03-14 16:47:32 -07001389 // If there are no paths
1390 if (minLength == Integer.MAX_VALUE) {
1391 return Collections.emptySet();
1392 }
Pier1f87aca2018-03-14 16:47:32 -07001393 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001394 Set<Link> sharedLinks = Sets.newHashSet();
1395 Set<Link> currentSharedLinks;
1396 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001397 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001398 // Let's find out the shared links
1399 while (index < minLength) {
1400 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001401 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001402 currentSharedLinks = Sets.newHashSet();
1403 // Iterate over the paths and take the "index" links
1404 for (Path path : currentPaths) {
1405 currentSharedLinks.add(path.links().get(index));
1406 }
1407 // Iterate over the remaining egress
1408 for (DeviceId egress : egresses) {
1409 // Iterate over the paths and take the "index" links
1410 currentLinks = Sets.newHashSet();
1411 for (Path path : availablePaths.get(egress)) {
1412 currentLinks.add(path.links().get(index));
1413 }
1414 // Do intersection
1415 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1416 // If there are no shared paths exit and record the device to remove
1417 // we have to retry with a subset of sinks
1418 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001419 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001420 index = minLength;
1421 break;
1422 }
1423 }
1424 sharedLinks.addAll(currentSharedLinks);
1425 index++;
1426 }
Piere99511d2018-04-19 16:47:06 +02001427 // If the shared links is empty and there are egress let's retry another time with less sinks,
1428 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001429 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1430 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001431 sharedLinks = exploreMcastTree(egresses, availablePaths);
1432 }
1433 return sharedLinks;
1434 }
1435
1436 /**
1437 * Build Mcast tree having as root the given source and as leaves the given egress points.
1438 *
piereaddb182020-02-03 13:50:53 +01001439 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001440 * @param source source of the tree
1441 * @param sinks leaves of the tree
1442 * @return the computed Mcast tree
1443 */
piereaddb182020-02-03 13:50:53 +01001444 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(IpAddress mcastIp,
1445 DeviceId source,
Charles Chanba59dd62018-05-10 22:19:49 +00001446 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001447 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001448 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1449 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +01001450 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001451 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001452 // We need to put back the source if it was originally present
1453 sinks.forEach(sink -> {
1454 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1455 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1456 });
Pier1f87aca2018-03-14 16:47:32 -07001457 return finalTree;
1458 }
1459
1460 /**
1461 * Build Mcast tree having as root the given source and as leaves the given egress.
1462 *
piereaddb182020-02-03 13:50:53 +01001463 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001464 * @param source source of the tree
1465 * @param egresses leaves of the tree
1466 * @return the computed Mcast tree
1467 */
piereaddb182020-02-03 13:50:53 +01001468 private Map<DeviceId, List<Path>> computeMcastTree(IpAddress mcastIp,
1469 DeviceId source,
Pier1f87aca2018-03-14 16:47:32 -07001470 Set<DeviceId> egresses) {
piereaddb182020-02-03 13:50:53 +01001471 log.debug("Computing tree for Multicast group {}, source {} and leafs {}",
1472 mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001473 // Pre-compute all the paths
1474 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001475 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1476 Collections.emptySet())));
1477 // Explore the topology looking for shared links amongst the egresses
1478 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001479 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001480 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001481 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1482 linksToEnforce)));
1483 return availablePaths;
1484 }
1485
1486 /**
1487 * Gets path from src to dst computed using the custom link weigher.
1488 *
1489 * @param src source device ID
1490 * @param dst destination device ID
1491 * @return list of paths from src to dst
1492 */
1493 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001494 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001495 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001496 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
piereaddb182020-02-03 13:50:53 +01001497 log.trace("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Pier1f87aca2018-03-14 16:47:32 -07001498 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001499 }
1500
Charles Chanc91c8782016-03-30 17:54:24 -07001501 /**
1502 * Gets a path from src to dst.
1503 * If a path was allocated before, returns the allocated path.
1504 * Otherwise, randomly pick one from available paths.
1505 *
1506 * @param src source device ID
1507 * @param dst destination device ID
1508 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001509 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001510 * @return an optional path from src to dst
1511 */
Piere99511d2018-04-19 16:47:06 +02001512 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1513 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001514 if (allPaths == null) {
1515 allPaths = getPaths(src, dst, Collections.emptySet());
1516 }
Charles Chanc91c8782016-03-30 17:54:24 -07001517 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001518 return Optional.empty();
1519 }
Piere99511d2018-04-19 16:47:06 +02001520 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001521 // a path in the list associated to the index 1 shares only the
1522 // first hop and it is less suitable of a path belonging to the index
1523 // 2 that shares leaf-spine.
1524 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001525 int nhop;
1526 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001527 PortNumber srcPort;
1528 Set<PortNumber> existingPorts;
1529 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001530 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001531 if (!src.equals(path.links().get(0).src().deviceId())) {
1532 continue;
1533 }
1534 nhop = 0;
1535 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001536 for (Link hop : path.links()) {
1537 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1538 source : null);
1539 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1540 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001541 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001542 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001543 }
Pier Luigi91573e12018-01-23 16:06:38 +01001544 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001545 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001546 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001547 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001548 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001549 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001550 }
1551 nhop++;
1552 }
1553 // n_hop defines the index
1554 if (nhop > 0) {
1555 eligiblePaths.compute(nhop, (index, paths) -> {
1556 paths = paths == null ? Lists.newArrayList() : paths;
1557 paths.add(path);
1558 return paths;
1559 });
Charles Chanc91c8782016-03-30 17:54:24 -07001560 }
1561 }
Pier Luigi91573e12018-01-23 16:06:38 +01001562 if (eligiblePaths.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001563 log.trace("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001564 Collections.shuffle(allPaths);
1565 return allPaths.stream().findFirst();
1566 }
Pier Luigi91573e12018-01-23 16:06:38 +01001567 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001568 Integer bestIndex = eligiblePaths.keySet().stream()
1569 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001570 List<Path> bestPaths = eligiblePaths.get(bestIndex);
piereaddb182020-02-03 13:50:53 +01001571 log.trace("{} eligiblePath(s) found from {} to {}",
Pier Luigi91573e12018-01-23 16:06:38 +01001572 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001573 Collections.shuffle(bestPaths);
1574 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001575 }
1576
1577 /**
Piere99511d2018-04-19 16:47:06 +02001578 * Gets device(s) of given role and of given source in given multicast tree.
1579 *
1580 * @param mcastIp multicast IP
1581 * @param role multicast role
1582 * @param source source connect point
1583 * @return set of device ID or empty set if not found
1584 */
1585 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1586 return mcastRoleStore.entrySet().stream()
1587 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001588 entry.getKey().source().equals(source) &&
1589 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001590 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1591 }
1592
1593 /**
Charles Chan72779502016-04-23 17:36:10 -07001594 * Gets device(s) of given role in given multicast group.
1595 *
1596 * @param mcastIp multicast IP
1597 * @param role multicast role
1598 * @return set of device ID or empty set if not found
1599 */
1600 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1601 return mcastRoleStore.entrySet().stream()
1602 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1603 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001604 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1605 }
1606
1607 /**
1608 * Gets source(s) of given role, given device in given multicast group.
1609 *
1610 * @param mcastIp multicast IP
1611 * @param deviceId device id
1612 * @param role multicast role
1613 * @return set of device ID or empty set if not found
1614 */
1615 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1616 return mcastRoleStore.entrySet().stream()
1617 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1618 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1619 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1620 }
1621
1622 /**
1623 * Gets source(s) of given multicast group.
1624 *
1625 * @param mcastIp multicast IP
1626 * @return set of device ID or empty set if not found
1627 */
1628 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1629 return mcastRoleStore.entrySet().stream()
1630 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1631 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001632 }
1633
1634 /**
1635 * Gets groups which is affected by the link down event.
1636 *
1637 * @param link link going down
1638 * @return a set of multicast IpAddress
1639 */
1640 private Set<IpAddress> getAffectedGroups(Link link) {
1641 DeviceId deviceId = link.src().deviceId();
1642 PortNumber port = link.src().port();
1643 return mcastNextObjStore.entrySet().stream()
1644 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001645 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1646 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001647 }
1648
1649 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001650 * Gets groups which are affected by the device down event.
1651 *
1652 * @param deviceId device going down
1653 * @return a set of multicast IpAddress
1654 */
1655 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1656 return mcastNextObjStore.entrySet().stream()
1657 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001658 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001659 .collect(Collectors.toSet());
1660 }
1661
1662 /**
Charles Chan72779502016-04-23 17:36:10 -07001663 * Gets the spine-facing port on ingress device of given multicast group.
1664 *
1665 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001666 * @param ingressDevice the ingress device
1667 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001668 * @return spine-facing port on ingress device
1669 */
Charles Chanba59dd62018-05-10 22:19:49 +00001670 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1671 ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001672 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001673 if (ingressDevice != null) {
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001674 Versioned<NextObjective> nextObjVers = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1675 mcastUtils.assignedVlan(source)));
1676 if (nextObjVers == null) {
1677 log.warn("Absent next objective for {}", new McastStoreKey(mcastIp, ingressDevice,
1678 mcastUtils.assignedVlan(source)));
1679 return portBuilder.build();
1680 }
1681 NextObjective nextObj = nextObjVers.value();
Pier7b657162018-03-27 11:29:42 -07001682 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001683 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001684 for (PortNumber port : ports) {
1685 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001686 if (srManager.deviceConfiguration() != null &&
1687 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan8d316332018-06-19 20:31:57 -07001688 (srManager.xconnectService == null ||
1689 !srManager.xconnectService.hasXconnect(new ConnectPoint(ingressDevice, port)))) {
Pier1a7e0c02018-03-12 15:00:54 -07001690 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001691 }
1692 }
1693 }
Pier1a7e0c02018-03-12 15:00:54 -07001694 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001695 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001696
1697 /**
Pier28164682018-04-17 15:50:43 +02001698 * Verify if a given connect point is sink for this group.
1699 *
1700 * @param mcastIp group address
1701 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001702 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001703 * @return true if the connect point is sink of the group
1704 */
Charles Chanba59dd62018-05-10 22:19:49 +00001705 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1706 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001707 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1708 source : null);
1709 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001710 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1711 return false;
1712 }
Pier28164682018-04-17 15:50:43 +02001713 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1714 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1715 }
1716
1717 /**
Piere99511d2018-04-19 16:47:06 +02001718 * Verify if a given connect point is sink for this group and for this source.
1719 *
1720 * @param mcastIp group address
1721 * @param connectPoint connect point to be verified
1722 * @param source source connect point
1723 * @return true if the connect point is sink of the group
1724 */
Charles Chanba59dd62018-05-10 22:19:49 +00001725 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1726 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001727 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1728 DeviceId device;
1729 if (connectPoint.deviceId().equals(source.deviceId())) {
1730 device = getDevice(mcastIp, INGRESS, source).stream()
1731 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1732 .findFirst().orElse(null);
1733 } else {
1734 device = getDevice(mcastIp, EGRESS, source).stream()
1735 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1736 .findFirst().orElse(null);
1737 }
1738 return isSink && device != null;
1739 }
1740
1741 /**
1742 * Verify if a sink is reachable from this source.
1743 *
1744 * @param mcastIp group address
1745 * @param sink connect point to be verified
1746 * @param source source connect point
1747 * @return true if the connect point is reachable from the source
1748 */
Charles Chanba59dd62018-05-10 22:19:49 +00001749 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1750 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001751 return sink.deviceId().equals(source.deviceId()) ||
1752 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1753 }
1754
1755 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001756 * Updates filtering objective for given device and port.
1757 * It is called in general when the mcast config has been
1758 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001759 *
1760 * @param deviceId device ID
1761 * @param portNum ingress port number
1762 * @param vlanId assigned VLAN ID
1763 * @param install true to add, false to remove
1764 */
Charles Chanba59dd62018-05-10 22:19:49 +00001765 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1766 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001767 lastMcastChange = Instant.now();
1768 mcastLock();
1769 try {
Piere99511d2018-04-19 16:47:06 +02001770 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigi35dab3f2018-01-25 16:16:02 +01001771 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001772 log.debug("Update filter for {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +02001773 if (!mcastUtils.isLeader(mcastRoute.group())) {
1774 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1775 return;
1776 }
Piere99511d2018-04-19 16:47:06 +02001777 // Get the sources and for each one update properly the filtering objectives
1778 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1779 sources.forEach(source -> {
1780 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1781 if (install) {
1782 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
1783 } else {
1784 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1785 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001786 }
Piere99511d2018-04-19 16:47:06 +02001787 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001788 });
1789 } finally {
1790 mcastUnlock();
1791 }
1792 }
1793
1794 /**
1795 * Performs bucket verification operation for all mcast groups in the devices.
1796 * Firstly, it verifies that mcast is stable before trying verification operation.
1797 * Verification consists in creating new nexts with VERIFY operation. Actually,
1798 * the operation is totally delegated to the driver.
1799 */
Piere99511d2018-04-19 16:47:06 +02001800 private final class McastBucketCorrector implements Runnable {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001801
pierc32ef422020-01-27 17:45:03 +01001802 private static final int MAX_VERIFY_ON_FLIGHT = 10;
1803 private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
1804 // Define the context used for the back pressure mechanism
1805 private final ObjectiveContext context = new DefaultObjectiveContext(
1806 (objective) -> {
1807 synchronized (verifyOnFlight) {
1808 verifyOnFlight.decrementAndGet();
1809 verifyOnFlight.notify();
1810 }
1811 },
1812 (objective, error) -> {
1813 synchronized (verifyOnFlight) {
1814 verifyOnFlight.decrementAndGet();
1815 verifyOnFlight.notify();
1816 }
1817 });
1818
Pier Luigi35dab3f2018-01-25 16:16:02 +01001819 @Override
1820 public void run() {
pierc32ef422020-01-27 17:45:03 +01001821 if (!isMcastStable() || wasBktCorrRunning()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001822 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001823 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001824 mcastLock();
1825 try {
1826 // Iterates over the routes and verify the related next objectives
pierc32ef422020-01-27 17:45:03 +01001827 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
1828 IpAddress mcastIp = mcastRoute.group();
1829 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1830 // Verify leadership on the operation
1831 if (!mcastUtils.isLeader(mcastIp)) {
1832 log.trace("Skip {} due to lack of leadership", mcastIp);
1833 continue;
1834 }
1835 // Get sources and sinks from Mcast Route Service and warn about errors
1836 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1837 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1838 .flatMap(Collection::stream).collect(Collectors.toSet());
1839 // Do not proceed if sources of this group are missing
1840 if (sources.isEmpty()) {
1841 if (!sinks.isEmpty()) {
1842 log.warn("Unable to run buckets corrector. " +
1843 "Missing source {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +02001844 }
pierc32ef422020-01-27 17:45:03 +01001845 continue;
1846 }
1847 // For each group we get current information in the store
1848 // and issue a check of the next objectives in place
1849 Set<McastStoreKey> processedKeys = Sets.newHashSet();
1850 for (ConnectPoint source : sources) {
1851 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1852 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1853 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1854 // Do not proceed if ingress devices are missing
1855 if (ingressDevices.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001856 if (!sinks.isEmpty()) {
1857 log.warn("Unable to run buckets corrector. " +
pierc32ef422020-01-27 17:45:03 +01001858 "Missing ingress {} for source {} and for group {}",
1859 ingressDevices, source, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001860 }
pierc32ef422020-01-27 17:45:03 +01001861 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001862 }
pierc32ef422020-01-27 17:45:03 +01001863 // Create the set of the devices to be processed
1864 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1865 if (!ingressDevices.isEmpty()) {
1866 devicesBuilder.addAll(ingressDevices);
1867 }
1868 if (!transitDevices.isEmpty()) {
1869 devicesBuilder.addAll(transitDevices);
1870 }
1871 if (!egressDevices.isEmpty()) {
1872 devicesBuilder.addAll(egressDevices);
1873 }
1874 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1875 for (DeviceId deviceId : devicesToProcess) {
1876 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1877 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
Piere99511d2018-04-19 16:47:06 +02001878 return;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001879 }
pierc32ef422020-01-27 17:45:03 +01001880 synchronized (verifyOnFlight) {
1881 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
1882 verifyOnFlight.wait();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001883 }
pierc32ef422020-01-27 17:45:03 +01001884 }
1885 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1886 source : null);
1887 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
1888 // Check if we already processed this next - trees merge at some point
1889 if (processedKeys.contains(currentKey)) {
1890 continue;
1891 }
1892 // Verify the nextobjective or skip to next device
1893 if (mcastNextObjStore.containsKey(currentKey)) {
1894 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1895 // Rebuild the next objective using assigned vlan
1896 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1897 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
1898 // Send to the flowobjective service
1899 srManager.flowObjectiveService.next(deviceId, currentNext);
1900 verifyOnFlight.incrementAndGet();
1901 log.trace("Verify on flight {}", verifyOnFlight);
1902 processedKeys.add(currentKey);
1903 } else {
1904 log.warn("Unable to run buckets corrector. " +
1905 "Missing next for {}, for source {} and for group {}",
1906 deviceId, source, mcastIp);
1907 }
1908 }
1909 }
1910 }
1911 } catch (InterruptedException e) {
1912 log.warn("BktCorr has been interrupted");
Pier Luigi35dab3f2018-01-25 16:16:02 +01001913 } finally {
pierc32ef422020-01-27 17:45:03 +01001914 lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +01001915 mcastUnlock();
1916 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001917 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001918 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001919
Piere99511d2018-04-19 16:47:06 +02001920 /**
1921 * Returns the associated next ids to the mcast groups or to the single
1922 * group if mcastIp is present.
1923 *
1924 * @param mcastIp the group ip
1925 * @return the mapping mcastIp-device to next id
1926 */
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001927 public Map<McastStoreKey, Integer> getNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001928 if (mcastIp != null) {
1929 return mcastNextObjStore.entrySet().stream()
1930 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02001931 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001932 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001933 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02001934 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01001935 }
1936
Pier71c55772018-04-17 17:25:22 +02001937 /**
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001938 * Removes given next ID from mcast next id store.
1939 *
1940 * @param nextId next id
1941 */
1942 public void removeNextId(int nextId) {
1943 mcastNextObjStore.entrySet().forEach(e -> {
1944 if (e.getValue().value().id() == nextId) {
1945 mcastNextObjStore.remove(e.getKey());
1946 }
1947 });
1948 }
1949
1950 /**
Piere99511d2018-04-19 16:47:06 +02001951 * Returns the associated roles to the mcast groups.
1952 *
1953 * @param mcastIp the group ip
1954 * @param sourcecp the source connect point
1955 * @return the mapping mcastIp-device to mcast role
1956 */
Charles Chanba59dd62018-05-10 22:19:49 +00001957 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
1958 ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02001959 if (mcastIp != null) {
1960 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
1961 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1962 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1963 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1964 if (sourcecp != null) {
1965 roles = roles.entrySet().stream()
1966 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
1967 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1968 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
1969 }
1970 return roles;
1971 }
1972 return mcastRoleStore.entrySet().stream()
1973 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
1974 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
1975 }
1976
Pier71c55772018-04-17 17:25:22 +02001977 /**
1978 * Returns the associated trees to the mcast group.
1979 *
1980 * @param mcastIp the group ip
1981 * @param sourcecp the source connect point
1982 * @return the mapping egress point to mcast path
1983 */
Charles Chanba59dd62018-05-10 22:19:49 +00001984 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1985 ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02001986 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02001987 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02001988 if (sourcecp != null) {
1989 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02001990 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02001991 }
Pier71c55772018-04-17 17:25:22 +02001992 if (!sources.isEmpty()) {
1993 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02001994 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1995 Set<DeviceId> visited = Sets.newHashSet();
1996 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -07001997 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
1998 currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02001999 mcastPaths.forEach(mcastTrees::put);
2000 });
2001 }
2002 return mcastTrees;
2003 }
2004
2005 /**
Pierdb27b8d2018-04-17 16:29:56 +02002006 * Return the leaders of the mcast groups.
2007 *
2008 * @param mcastIp the group ip
2009 * @return the mapping group-node
2010 */
2011 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
2012 return mcastUtils.getMcastLeaders(mcastIp);
2013 }
Charles Chanc91c8782016-03-30 17:54:24 -07002014}