blob: 52618f411d70475d08b7d94a47f46d7a5970cb98 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Piere99511d2018-04-19 16:47:06 +020019import com.google.common.base.Objects;
Pier Luigid29ca7c2018-02-28 17:24:03 +010020import com.google.common.cache.Cache;
21import com.google.common.cache.CacheBuilder;
22import com.google.common.cache.RemovalCause;
23import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020024import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070025import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070026import com.google.common.collect.ImmutableSet;
27import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010028import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020029import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070030import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onlab.packet.VlanId;
33import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020034import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070035import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastEvent;
38import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070039import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070040import org.onosproject.mcast.api.McastRouteUpdate;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000041import org.onosproject.net.Device;
Charles Chanba59dd62018-05-10 22:19:49 +000042import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070043import org.onosproject.net.ConnectPoint;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Link;
46import org.onosproject.net.Path;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000047import org.onosproject.net.Port;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070049import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070050import org.onosproject.net.flowobjective.ForwardingObjective;
51import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070052import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070053import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010054import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070055import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070056import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010057import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chanc91c8782016-03-30 17:54:24 -070058import org.onosproject.store.serializers.KryoNamespaces;
59import org.onosproject.store.service.ConsistentMap;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000060import org.onosproject.store.service.DistributedSet;
Charles Chanc91c8782016-03-30 17:54:24 -070061import org.onosproject.store.service.Serializer;
Andrea Campanella5b4cd652018-06-05 14:19:21 +020062import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070063import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
Pier Luigi35dab3f2018-01-25 16:16:02 +010066import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.Collection;
68import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010069import java.util.Comparator;
Harshada Chaundkar9204f312019-07-02 16:01:24 +000070import java.util.Iterator;
Charles Chanc91c8782016-03-30 17:54:24 -070071import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070072import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070073import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070074import java.util.Optional;
75import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010076import java.util.concurrent.ScheduledExecutorService;
77import java.util.concurrent.TimeUnit;
pierc32ef422020-01-27 17:45:03 +010078import java.util.concurrent.atomic.AtomicInteger;
Pier Luigi35dab3f2018-01-25 16:16:02 +010079import java.util.concurrent.locks.Lock;
80import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070081import java.util.stream.Collectors;
82
Pier Luigi35dab3f2018-01-25 16:16:02 +010083import static java.util.concurrent.Executors.newScheduledThreadPool;
84import static org.onlab.util.Tools.groupedThreads;
Charles Chanba59dd62018-05-10 22:19:49 +000085
Pierdb27b8d2018-04-17 16:29:56 +020086import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070087import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Andrea Campanellaef30d7a2018-04-27 14:44:15 +020088import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
89import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
Charles Chanba59dd62018-05-10 22:19:49 +000090import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
91import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
92
Pier979e61a2018-03-07 11:42:50 +010093import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
94import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
95import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070096
97/**
Pier Luigi69f774d2018-02-28 12:10:50 +010098 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070099 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700100public class McastHandler {
101 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700102 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -0700103 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +0200104 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700105 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Piere99511d2018-04-19 16:47:06 +0200106 private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000107 private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
Charles Chan72779502016-04-23 17:36:10 -0700108
Pier Luigid29ca7c2018-02-28 17:24:03 +0100109 // Wait time for the cache
110 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700111
Piere99511d2018-04-19 16:47:06 +0200112 //The mcastEventCache is implemented to avoid race condition by giving more time
113 // to the underlying subsystems to process previous calls.
Pier Luigid29ca7c2018-02-28 17:24:03 +0100114 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
115 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
116 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100117 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700118 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100119 McastEvent mcastEvent = notification.getValue();
120 RemovalCause cause = notification.getCause();
Piere99511d2018-04-19 16:47:06 +0200121 // If it expires or it has been replaced, we deque the event - no when evicted
Pier Luigid29ca7c2018-02-28 17:24:03 +0100122 switch (notification.getCause()) {
123 case REPLACED:
124 case EXPIRED:
125 dequeueMcastEvent(mcastEvent);
126 break;
127 default:
128 break;
129 }
130 }).build();
131
132 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier1f87aca2018-03-14 16:47:32 -0700133 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700134 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
135 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700136 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700137 if (mcastEvent.type() == SOURCES_ADDED ||
138 mcastEvent.type() == SOURCES_REMOVED) {
Piere99511d2018-04-19 16:47:06 +0200139 // Current subject and prev just differ for the source connect points
140 sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
Pier7b657162018-03-27 11:29:42 -0700141 } else if (mcastEvent.type() == SINKS_ADDED) {
Pier7b657162018-03-27 11:29:42 -0700142 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
143 // Get the previous locations and verify if there are changes
144 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
145 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
146 prevConnectPoints : Collections.emptySet());
147 if (!changes.isEmpty()) {
148 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700149 }
Pier7b657162018-03-27 11:29:42 -0700150 }));
151 } else if (mcastEvent.type() == SINKS_REMOVED) {
Pier7b657162018-03-27 11:29:42 -0700152 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
153 // Get the current locations and verify if there are changes
154 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
155 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
156 currentConnectPoints : Collections.emptySet());
157 if (!changes.isEmpty()) {
158 sinksBuilder.add(hostId);
159 }
160 }));
161 } else if (mcastEvent.type() == ROUTE_REMOVED) {
162 // Current subject is null, just take the previous host ids
163 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100164 }
Pier Luigid29ca7c2018-02-28 17:24:03 +0100165 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700166 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100167 mcastEventCache.put(cacheKey, mcastEvent);
168 });
169 }
170
171 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700172 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
173 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier7b657162018-03-27 11:29:42 -0700174 IpAddress mcastIp = mcastPrevUpdate.route().group();
Pier7b657162018-03-27 11:29:42 -0700175 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Piere99511d2018-04-19 16:47:06 +0200176 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
177 Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
178 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
179 Set<ConnectPoint> sources;
Pier Luigid29ca7c2018-02-28 17:24:03 +0100180 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700181 case SOURCES_ADDED:
Piere99511d2018-04-19 16:47:06 +0200182 sources = mcastUpdate.sources()
183 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
184 Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
185 processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100186 break;
Pier1f87aca2018-03-14 16:47:32 -0700187 case SOURCES_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200188 sources = mcastUpdate.sources()
189 .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
190 Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
191 processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100192 break;
193 case ROUTE_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200194 processRouteRemovedInternal(prevSources, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100195 break;
Pier1f87aca2018-03-14 16:47:32 -0700196 case SINKS_ADDED:
Piere99511d2018-04-19 16:47:06 +0200197 processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100198 break;
Pier1f87aca2018-03-14 16:47:32 -0700199 case SINKS_REMOVED:
Piere99511d2018-04-19 16:47:06 +0200200 processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100201 break;
202 default:
203 break;
204 }
205 }
206
Pier Luigi35dab3f2018-01-25 16:16:02 +0100207 // Mcast lock to serialize local operations
208 private final Lock mcastLock = new ReentrantLock();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100209 private void mcastLock() {
210 mcastLock.lock();
211 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100212 private void mcastUnlock() {
213 mcastLock.unlock();
214 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100215 // Stability threshold for Mcast. Seconds
216 private static final long MCAST_STABLITY_THRESHOLD = 5;
217 // Last change done
218 private Instant lastMcastChange = Instant.now();
pierc32ef422020-01-27 17:45:03 +0100219 // Last bucker corrector execution
220 private Instant lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100221
222 /**
223 * Determines if mcast in the network has been stable in the last
224 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
225 * to the last mcast change timestamp.
226 *
227 * @return true if stable
228 */
229 private boolean isMcastStable() {
230 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
231 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
pierc32ef422020-01-27 17:45:03 +0100232 log.trace("Multicast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100233 return (now - last) > MCAST_STABLITY_THRESHOLD;
234 }
235
pierc32ef422020-01-27 17:45:03 +0100236 /**
237 * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
238 * by comparing the current time with the last corrector execution.
239 *
240 * @return true if stable
241 */
242 private boolean wasBktCorrRunning() {
243 long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
244 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
245 log.trace("McastBucketCorrector executed {}s ago", now - last);
246 return (now - last) < MCAST_VERIFY_INTERVAL;
247 }
248
Piere99511d2018-04-19 16:47:06 +0200249 // Verify interval for Mcast bucket corrector
Pier Luigi35dab3f2018-01-25 16:16:02 +0100250 private static final long MCAST_VERIFY_INTERVAL = 30;
Piere99511d2018-04-19 16:47:06 +0200251 // Executor for mcast bucket corrector and for cache
Pier Luigi35dab3f2018-01-25 16:16:02 +0100252 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100253 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100254
Charles Chan72779502016-04-23 17:36:10 -0700255 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700256 * Constructs the McastEventHandler.
257 *
258 * @param srManager Segment Routing manager
259 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700260 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700261 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700262 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700263 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700264 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700265 .register(KryoNamespaces.API)
Piere99511d2018-04-19 16:47:06 +0200266 .register(new McastStoreKeySerializer(), McastStoreKey.class);
Pier7b657162018-03-27 11:29:42 -0700267 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700268 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700269 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700270 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700271 .build();
Piere99511d2018-04-19 16:47:06 +0200272 mcastKryo = new KryoNamespace.Builder()
273 .register(KryoNamespaces.API)
274 .register(new McastRoleStoreKeySerializer(), McastRoleStoreKey.class)
275 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700276 mcastRoleStore = srManager.storageService
Piere99511d2018-04-19 16:47:06 +0200277 .<McastRoleStoreKey, McastRole>consistentMapBuilder()
Charles Chan72779502016-04-23 17:36:10 -0700278 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700279 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700280 .build();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000281 mcastKryo = new KryoNamespace.Builder()
282 .register(KryoNamespaces.API)
283 .register(new McastFilteringObjStoreSerializer(), McastFilteringObjStoreKey.class);
284 mcastFilteringObjStore = srManager.storageService
285 .<McastFilteringObjStoreKey>setBuilder()
286 .withName("onos-mcast-filtering-store")
287 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-FilteringObj")))
288 .build()
289 .asDistributedSet();
Pier7b657162018-03-27 11:29:42 -0700290 mcastUtils = new McastUtils(srManager, coreAppId, log);
Piere99511d2018-04-19 16:47:06 +0200291 // Init the executor service, the buckets corrector and schedule the clean up
Pier Luigi35dab3f2018-01-25 16:16:02 +0100292 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700293 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100294 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
295 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700296 }
297
298 /**
Piere99511d2018-04-19 16:47:06 +0200299 * Read initial multicast configuration from mcast store.
Charles Chan72779502016-04-23 17:36:10 -0700300 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100301 public void init() {
Pier7b657162018-03-27 11:29:42 -0700302 lastMcastChange = Instant.now();
303 mcastLock();
304 try {
305 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Piere99511d2018-04-19 16:47:06 +0200306 log.debug("Init group {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +0200307 if (!mcastUtils.isLeader(mcastRoute.group())) {
308 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
309 return;
310 }
Pier7b657162018-03-27 11:29:42 -0700311 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
Piere99511d2018-04-19 16:47:06 +0200312 // For each source process the mcast tree
313 srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
314 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
315 Set<DeviceId> visited = Sets.newHashSet();
316 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700317 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
Piere99511d2018-04-19 16:47:06 +0200318 currentPath, mcastRoute.group(), source);
319 // Get all the sinks and process them
320 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
321 mcastRouteData.sinks());
322 // Filter out all the working sinks, we do not want to move them
323 // TODO we need a better way to distinguish flows coming from different sources
324 sinks = sinks.stream()
325 .filter(sink -> !mcastPaths.containsKey(sink) ||
326 !isSinkForSource(mcastRoute.group(), sink, source))
327 .collect(Collectors.toSet());
328 if (sinks.isEmpty()) {
329 log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
330 return;
331 }
piereaddb182020-02-03 13:50:53 +0100332 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
333 source.deviceId(), sinks);
Piere99511d2018-04-19 16:47:06 +0200334 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
335 mcastRoute.group(), paths));
336 });
Pier7b657162018-03-27 11:29:42 -0700337 });
338 } finally {
339 mcastUnlock();
340 }
Charles Chanc91c8782016-03-30 17:54:24 -0700341 }
342
343 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100344 * Clean up when deactivating the application.
345 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100346 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200347 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100348 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200349 mcastNextObjStore.destroy();
350 mcastRoleStore.destroy();
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000351 mcastFilteringObjStore.destroy();
Pier72d0e582018-04-20 14:14:34 +0200352 mcastUtils.terminate();
353 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100354 }
355
356 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100357 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
Piere99511d2018-04-19 16:47:06 +0200358 * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700359 *
360 * @param event McastEvent with SOURCE_ADDED type
361 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100362 public void processMcastEvent(McastEvent event) {
Pierdb27b8d2018-04-17 16:29:56 +0200363 // If it is a route added, we do not enqueue
364 if (event.type() == ROUTE_ADDED) {
Pierdb27b8d2018-04-17 16:29:56 +0200365 processRouteAddedInternal(event.subject().route().group());
366 } else {
Pierdb27b8d2018-04-17 16:29:56 +0200367 enqueueMcastEvent(event);
368 }
Pier Luigi6786b922018-02-02 16:19:11 +0100369 }
370
371 /**
Piere99511d2018-04-19 16:47:06 +0200372 * Process the SOURCES_ADDED event.
373 *
374 * @param sources the sources connect point
375 * @param mcastIp the group address
376 * @param sinks the sinks connect points
377 */
378 private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
379 Map<HostId, Set<ConnectPoint>> sinks) {
380 lastMcastChange = Instant.now();
381 mcastLock();
382 try {
piereaddb182020-02-03 13:50:53 +0100383 log.info("Processing sources added {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +0200384 if (!mcastUtils.isLeader(mcastIp)) {
385 log.debug("Skip {} due to lack of leadership", mcastIp);
386 return;
387 }
388 sources.forEach(source -> {
389 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
piereaddb182020-02-03 13:50:53 +0100390 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
391 sinksToBeAdded);
Piere99511d2018-04-19 16:47:06 +0200392 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
393 });
394 } finally {
395 mcastUnlock();
396 }
397 }
398
399 /**
400 * Process the SOURCES_REMOVED event.
401 *
402 * @param sourcesToBeRemoved the source connect points to be removed
403 * @param remainingSources the remainig source connect points
404 * @param mcastIp the group address
405 * @param sinks the sinks connect points
406 */
407 private void processSourcesRemovedInternal(Set<ConnectPoint> sourcesToBeRemoved,
408 Set<ConnectPoint> remainingSources,
409 IpAddress mcastIp,
410 Map<HostId, Set<ConnectPoint>> sinks) {
411 lastMcastChange = Instant.now();
412 mcastLock();
413 try {
piereaddb182020-02-03 13:50:53 +0100414 log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
Piere99511d2018-04-19 16:47:06 +0200415 if (!mcastUtils.isLeader(mcastIp)) {
416 log.debug("Skip {} due to lack of leadership", mcastIp);
417 return;
418 }
419 if (remainingSources.isEmpty()) {
piereaddb182020-02-03 13:50:53 +0100420 log.debug("There are no more sources for {}", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200421 processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
422 return;
423 }
424 // Skip offline devices
425 Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
426 .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
427 .collect(Collectors.toSet());
428 if (candidateSources.isEmpty()) {
429 log.debug("Skip {} due to empty sources to be removed", mcastIp);
430 return;
431 }
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000432 // Let's heal the trees
Piere99511d2018-04-19 16:47:06 +0200433 Set<Link> remainingLinks = Sets.newHashSet();
434 Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
435 Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
436 Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
437 totalSources.addAll(remainingSources);
438 // Calculate all the links used by the sources
439 totalSources.forEach(source -> {
440 Set<ConnectPoint> currentSinks = sinks.values()
441 .stream().flatMap(Collection::stream)
442 .filter(sink -> isSinkForSource(mcastIp, sink, source))
443 .collect(Collectors.toSet());
444 candidateSinks.put(source, currentSinks);
445 currentSinks.forEach(currentSink -> {
446 Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
447 mcastIp, null, source);
448 if (currentPath.isPresent()) {
449 if (!candidateSources.contains(source)) {
450 remainingLinks.addAll(currentPath.get().links());
451 } else {
452 candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
453 }
454 }
455 });
456 });
457 // Clean transit links
458 candidateLinks.forEach((source, currentCandidateLinks) -> {
459 Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
460 .immutableCopy();
461 if (!linksToBeRemoved.isEmpty()) {
462 currentCandidateLinks.forEach(link -> {
463 DeviceId srcLink = link.src().deviceId();
464 // Remove ports only on links to be removed
465 if (linksToBeRemoved.contains(link)) {
466 removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
467 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
468 source : null));
469 }
470 // Remove role on the candidate links
471 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
472 });
473 }
474 });
475 // Clean ingress and egress
476 candidateSources.forEach(source -> {
477 Set<ConnectPoint> currentSinks = candidateSinks.get(source);
478 currentSinks.forEach(currentSink -> {
479 VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
480 source : null);
481 // Sinks co-located with the source
482 if (source.deviceId().equals(currentSink.deviceId())) {
483 if (source.port().equals(currentSink.port())) {
484 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
485 mcastIp, currentSink, source);
486 return;
487 }
488 // We need to check against the other sources and if it is
489 // necessary remove the port from the device - no overlap
490 Set<VlanId> otherVlans = remainingSources.stream()
491 // Only sources co-located and having this sink
492 .filter(remainingSource -> remainingSource.deviceId()
493 .equals(source.deviceId()) && candidateSinks.get(remainingSource)
494 .contains(currentSink))
495 .map(remainingSource -> mcastUtils.assignedVlan(
496 remainingSource.deviceId().equals(currentSink.deviceId()) ?
497 remainingSource : null)).collect(Collectors.toSet());
498 if (!otherVlans.contains(assignedVlan)) {
499 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
500 mcastIp, assignedVlan);
501 }
502 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
503 source));
504 return;
505 }
506 Set<VlanId> otherVlans = remainingSources.stream()
507 .filter(remainingSource -> candidateSinks.get(remainingSource)
508 .contains(currentSink))
509 .map(remainingSource -> mcastUtils.assignedVlan(
510 remainingSource.deviceId().equals(currentSink.deviceId()) ?
511 remainingSource : null)).collect(Collectors.toSet());
512 // Sinks on other leaves
513 if (!otherVlans.contains(assignedVlan)) {
514 removePortFromDevice(currentSink.deviceId(), currentSink.port(),
515 mcastIp, assignedVlan);
516 }
517 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
518 source));
519 });
520 });
521 } finally {
522 mcastUnlock();
523 }
524 }
525
526 /**
Pierdb27b8d2018-04-17 16:29:56 +0200527 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100528 *
Pierdb27b8d2018-04-17 16:29:56 +0200529 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100530 */
Pierdb27b8d2018-04-17 16:29:56 +0200531 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100532 lastMcastChange = Instant.now();
533 mcastLock();
534 try {
piereaddb182020-02-03 13:50:53 +0100535 log.info("Processing route added for Multicast group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200536 // Just elect a new leader
537 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100538 } finally {
539 mcastUnlock();
540 }
541 }
542
543 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100544 * Removes the entire mcast tree related to this group.
Piere99511d2018-04-19 16:47:06 +0200545 * @param sources the source connect points
Pier Luigi6786b922018-02-02 16:19:11 +0100546 * @param mcastIp multicast group IP address
547 */
Piere99511d2018-04-19 16:47:06 +0200548 private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
Pier Luigi6786b922018-02-02 16:19:11 +0100549 lastMcastChange = Instant.now();
550 mcastLock();
551 try {
piereaddb182020-02-03 13:50:53 +0100552 log.info("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200553 if (!mcastUtils.isLeader(mcastIp)) {
554 log.debug("Skip {} due to lack of leadership", mcastIp);
555 mcastUtils.withdrawLeader(mcastIp);
556 return;
557 }
Piere99511d2018-04-19 16:47:06 +0200558 sources.forEach(source -> {
559 // Find out the ingress, transit and egress device of the affected group
560 DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
561 .stream().findFirst().orElse(null);
562 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
563 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
564 // If there are no egress and transit devices, sinks could be only on the ingress
565 if (!egressDevices.isEmpty()) {
566 egressDevices.forEach(deviceId -> {
567 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
568 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
569 });
570 }
571 if (!transitDevices.isEmpty()) {
572 transitDevices.forEach(deviceId -> {
573 removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
574 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
575 });
576 }
577 if (ingressDevice != null) {
578 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
579 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
580 }
581 });
582 // Finally, withdraw the leadership
583 mcastUtils.withdrawLeader(mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100584 } finally {
585 mcastUnlock();
586 }
587 }
588
Pier7b657162018-03-27 11:29:42 -0700589 /**
590 * Process sinks to be removed.
591 *
Piere99511d2018-04-19 16:47:06 +0200592 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700593 * @param mcastIp the ip address of the group
594 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200595 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700596 */
Piere99511d2018-04-19 16:47:06 +0200597 private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700598 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200599 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700600 lastMcastChange = Instant.now();
601 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700602 try {
piereaddb182020-02-03 13:50:53 +0100603 log.info("Processing sinks removed for group {} and for sources {}",
604 mcastIp, sources);
Pierdb27b8d2018-04-17 16:29:56 +0200605 if (!mcastUtils.isLeader(mcastIp)) {
606 log.debug("Skip {} due to lack of leadership", mcastIp);
607 return;
608 }
Piere99511d2018-04-19 16:47:06 +0200609 Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
610 Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
611 sources.forEach(source -> {
612 // Save the path associated to the sinks to be removed
613 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
614 newSinks, source);
615 Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
616 sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
617 sink.deviceId(), mcastIp,
618 null, source)));
619 treesToBeRemoved.put(source, treeToBeRemoved);
620 // Recover the dual-homed sinks
621 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
622 prevSinks, source);
623 treesToBeAdded.put(source, sinksToBeRecovered);
624 });
625 // Remove the sinks taking into account the multiple sources and the original paths
626 treesToBeRemoved.forEach((source, tree) ->
627 tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
628 // Add new sinks according to the recovery procedure
629 treesToBeAdded.forEach((source, sinks) ->
630 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
Pier7b657162018-03-27 11:29:42 -0700631 } finally {
632 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700633 }
634 }
635
Pier Luigi6786b922018-02-02 16:19:11 +0100636 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100637 * Removes a path from source to sink for given multicast group.
638 *
639 * @param source connect point of the multicast source
640 * @param sink connection point of the multicast sink
641 * @param mcastIp multicast group IP address
Piere99511d2018-04-19 16:47:06 +0200642 * @param mcastPath path associated to the sink
Pier Luigi35dab3f2018-01-25 16:16:02 +0100643 */
644 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Piere99511d2018-04-19 16:47:06 +0200645 IpAddress mcastIp, Optional<Path> mcastPath) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100646 lastMcastChange = Instant.now();
647 mcastLock();
648 try {
piereaddb182020-02-03 13:50:53 +0100649 log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700650 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100651 // When source and sink are on the same device
652 if (source.deviceId().equals(sink.deviceId())) {
653 // Source and sink are on even the same port. There must be something wrong.
654 if (source.port().equals(sink.port())) {
Piere99511d2018-04-19 16:47:06 +0200655 log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100656 return;
657 }
Pier7b657162018-03-27 11:29:42 -0700658 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100659 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200660 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100661 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100662 return;
663 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100664 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700665 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100666 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200667 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100668 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100669 // If this is the last sink on the device, also update upstream
Pier Luigi35dab3f2018-01-25 16:16:02 +0100670 if (mcastPath.isPresent()) {
671 List<Link> links = Lists.newArrayList(mcastPath.get().links());
672 Collections.reverse(links);
673 for (Link link : links) {
674 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200675 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
676 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Pier Luigi92e69be2018-03-02 12:53:37 +0100677 if (isLast) {
Piere99511d2018-04-19 16:47:06 +0200678 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100679 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100680 }
Charles Chanc91c8782016-03-30 17:54:24 -0700681 }
682 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100683 } finally {
684 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700685 }
686 }
687
Pier7b657162018-03-27 11:29:42 -0700688
689 /**
690 * Process sinks to be added.
691 *
Piere99511d2018-04-19 16:47:06 +0200692 * @param sources the source connect points
Pier7b657162018-03-27 11:29:42 -0700693 * @param mcastIp the group IP
694 * @param newSinks the new sinks to be processed
695 * @param allPrevSinks all previous sinks
696 */
Piere99511d2018-04-19 16:47:06 +0200697 private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -0700698 Map<HostId, Set<ConnectPoint>> newSinks,
699 Set<ConnectPoint> allPrevSinks) {
700 lastMcastChange = Instant.now();
701 mcastLock();
702 try {
piereaddb182020-02-03 13:50:53 +0100703 log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
Pierdb27b8d2018-04-17 16:29:56 +0200704 if (!mcastUtils.isLeader(mcastIp)) {
705 log.debug("Skip {} due to lack of leadership", mcastIp);
706 return;
707 }
Piere99511d2018-04-19 16:47:06 +0200708 sources.forEach(source -> {
709 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
710 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
711 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
712 });
Pier7b657162018-03-27 11:29:42 -0700713 } finally {
714 mcastUnlock();
715 }
716 }
717
Charles Chanc91c8782016-03-30 17:54:24 -0700718 /**
719 * Establishes a path from source to sink for given multicast group.
720 *
721 * @param source connect point of the multicast source
722 * @param sink connection point of the multicast sink
723 * @param mcastIp multicast group IP address
724 */
725 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700726 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100727 lastMcastChange = Instant.now();
728 mcastLock();
729 try {
piereaddb182020-02-03 13:50:53 +0100730 log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100731 // Process the ingress device
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000732 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
733 mcastUtils.assignedVlan(source), mcastIp.isIp4());
734 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100735 if (source.deviceId().equals(sink.deviceId())) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100736 if (source.port().equals(sink.port())) {
737 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
738 mcastIp, sink, source);
739 return;
740 }
Pier7b657162018-03-27 11:29:42 -0700741 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere99511d2018-04-19 16:47:06 +0200742 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100743 return;
744 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100745 // Find a path. If present, create/update groups and flows for each hop
Piere99511d2018-04-19 16:47:06 +0200746 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100747 if (mcastPath.isPresent()) {
748 List<Link> links = mcastPath.get().links();
Pier1a7e0c02018-03-12 15:00:54 -0700749 // Setup mcast role for ingress
Piere99511d2018-04-19 16:47:06 +0200750 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
751 // Setup properly the transit forwarding
Pier Luigi35dab3f2018-01-25 16:16:02 +0100752 links.forEach(link -> {
753 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700754 mcastUtils.assignedVlan(link.src().deviceId()
755 .equals(source.deviceId()) ? source : null));
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000756 McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
757 mcastUtils.assignedVlan(null), mcastIp.isIp4());
758 addFilterToDevice(filteringKey, mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100759 });
Pier1a7e0c02018-03-12 15:00:54 -0700760 // Setup mcast role for the transit
761 links.stream()
762 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000763 .forEach(link -> {
764 log.trace("Transit links {}", link);
765 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
766 source), TRANSIT);
767 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100768 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700769 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700770 // Setup mcast role for egress
Piere99511d2018-04-19 16:47:06 +0200771 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100772 } else {
Piere99511d2018-04-19 16:47:06 +0200773 log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100774 }
775 } finally {
776 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700777 }
778 }
779
780 /**
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000781 * Processes PORT_UPDATED event.
782 *
783 * @param affectedDevice Affected device
784 * @param affectedPort Affected port
785 */
786 public void processPortUpdate(Device affectedDevice, Port affectedPort) {
787 // Clean the filtering obj store. Edge port case.
788 ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
789 if (!affectedPort.isEnabled()) {
790 updateFilterObjStoreByPort(portDown);
791 }
792 }
793
794 /**
Charles Chan72779502016-04-23 17:36:10 -0700795 * Processes the LINK_DOWN event.
796 *
piereaddb182020-02-03 13:50:53 +0100797 * @param linkDown Link that is going down
Charles Chan72779502016-04-23 17:36:10 -0700798 */
piereaddb182020-02-03 13:50:53 +0100799 public void processLinkDown(Link linkDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100800 lastMcastChange = Instant.now();
801 mcastLock();
802 try {
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000803 // Get mcast groups affected by the link going down
piereaddb182020-02-03 13:50:53 +0100804 Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
805 log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
806 affectedGroups.forEach(mcastIp -> {
807 log.debug("Processing link down {} for group {}", linkDown, mcastIp);
808 recoverFailure(mcastIp, linkDown);
Charles Chan72779502016-04-23 17:36:10 -0700809 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100810 } finally {
811 mcastUnlock();
812 }
Charles Chan72779502016-04-23 17:36:10 -0700813 }
814
815 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100816 * Process the DEVICE_DOWN event.
817 *
818 * @param deviceDown device going down
819 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100820 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100821 lastMcastChange = Instant.now();
822 mcastLock();
823 try {
824 // Get the mcast groups affected by the device going down
piereaddb182020-02-03 13:50:53 +0100825 Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
826 log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
Harshada Chaundkar9204f312019-07-02 16:01:24 +0000827 updateFilterObjStoreByDevice(deviceDown);
piereaddb182020-02-03 13:50:53 +0100828 affectedGroups.forEach(mcastIp -> {
Piere99511d2018-04-19 16:47:06 +0200829 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
830 recoverFailure(mcastIp, deviceDown);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100831 });
832 } finally {
833 mcastUnlock();
834 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100835 }
836
837 /**
Piere99511d2018-04-19 16:47:06 +0200838 * General failure recovery procedure.
839 *
840 * @param mcastIp the group to recover
841 * @param failedElement the failed element
842 */
843 private void recoverFailure(IpAddress mcastIp, Object failedElement) {
844 // TODO Optimize when the group editing is in place
845 if (!mcastUtils.isLeader(mcastIp)) {
846 log.debug("Skip {} due to lack of leadership", mcastIp);
847 return;
848 }
849 // Do not proceed if the sources of this group are missing
850 Set<ConnectPoint> sources = getSources(mcastIp);
851 if (sources.isEmpty()) {
852 log.warn("Missing sources for group {}", mcastIp);
853 return;
854 }
855 // Find out the ingress devices of the affected group
856 // If sinks are in other leafs, we have ingress, transit, egress, and source
857 // If sinks are in the same leaf, we have just ingress and source
858 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS);
859 if (ingressDevices.isEmpty()) {
piereaddb182020-02-03 13:50:53 +0100860 log.warn("Missing ingress devices for group {}", mcastIp);
Piere99511d2018-04-19 16:47:06 +0200861 return;
862 }
863 // For each tree, delete ingress-transit part
864 sources.forEach(source -> {
865 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
866 transitDevices.forEach(transitDevice -> {
867 removeGroupFromDevice(transitDevice, mcastIp, mcastUtils.assignedVlan(null));
868 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, transitDevice, source));
869 });
870 });
871 removeIngressTransitPorts(mcastIp, ingressDevices, sources);
872 // TODO Evaluate the possibility of building optimize trees between sources
873 Map<DeviceId, Set<ConnectPoint>> notRecovered = Maps.newHashMap();
874 sources.forEach(source -> {
875 Set<DeviceId> notRecoveredInternal = Sets.newHashSet();
876 DeviceId ingressDevice = ingressDevices.stream()
877 .filter(deviceId -> deviceId.equals(source.deviceId())).findFirst().orElse(null);
878 // Clean also the ingress
879 if (failedElement instanceof DeviceId && ingressDevice.equals(failedElement)) {
880 removeGroupFromDevice((DeviceId) failedElement, mcastIp, mcastUtils.assignedVlan(source));
881 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, (DeviceId) failedElement, source));
882 }
883 if (ingressDevice == null) {
884 log.warn("Skip failure recovery - " +
885 "Missing ingress for source {} and group {}", source, mcastIp);
886 return;
887 }
888 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
piereaddb182020-02-03 13:50:53 +0100889 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, egressDevices);
Piere99511d2018-04-19 16:47:06 +0200890 // We have to verify, if there are egresses without paths
891 mcastTree.forEach((egressDevice, paths) -> {
892 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
893 mcastIp, paths, source);
894 // No paths, we have to try with alternative location
895 if (!mcastPath.isPresent()) {
896 notRecovered.compute(egressDevice, (deviceId, listSources) -> {
897 listSources = listSources == null ? Sets.newHashSet() : listSources;
898 listSources.add(source);
899 return listSources;
900 });
901 notRecoveredInternal.add(egressDevice);
902 }
903 });
904 // Fast path, we can recover all the locations
905 if (notRecoveredInternal.isEmpty()) {
906 mcastTree.forEach((egressDevice, paths) -> {
Charles Chanba59dd62018-05-10 22:19:49 +0000907 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
908 mcastIp, paths, source);
909 if (mcastPath.isPresent()) {
910 installPath(mcastIp, source, mcastPath.get());
911 }
Piere99511d2018-04-19 16:47:06 +0200912 });
913 } else {
914 // Let's try to recover using alternative locations
915 recoverSinks(egressDevices, notRecoveredInternal, mcastIp,
916 ingressDevice, source);
917 }
918 });
919 // Finally remove the egresses not recovered
920 notRecovered.forEach((egressDevice, listSources) -> {
921 Set<ConnectPoint> currentSources = getSources(mcastIp, egressDevice, EGRESS);
922 if (Objects.equal(currentSources, listSources)) {
923 log.warn("Fail to recover egress device {} from {} failure {}",
924 egressDevice, failedElement instanceof Link ? "Link" : "Device", failedElement);
925 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
926 }
927 listSources.forEach(source -> mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, egressDevice, source)));
928 });
929 }
930
931 /**
Pier7b657162018-03-27 11:29:42 -0700932 * Try to recover sinks using alternate locations.
933 *
934 * @param egressDevices the original egress devices
935 * @param notRecovered the devices not recovered
936 * @param mcastIp the group address
937 * @param ingressDevice the ingress device
938 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -0700939 */
940 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
Piere99511d2018-04-19 16:47:06 +0200941 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source) {
942 log.debug("Processing recover sinks for group {} and for source {}",
943 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -0700944 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
Pier7b657162018-03-27 11:29:42 -0700945 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
Pier7b657162018-03-27 11:29:42 -0700946 Set<ConnectPoint> totalSinks = Sets.newHashSet();
947 // Let's compute all the affected sinks and all the sinks
948 notRecovered.forEach(deviceId -> {
949 totalAffectedSinks.addAll(
Charles Chanba59dd62018-05-10 22:19:49 +0000950 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
951 .flatMap(Collection::stream)
Pier7b657162018-03-27 11:29:42 -0700952 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
Charles Chanba59dd62018-05-10 22:19:49 +0000953 .collect(Collectors.toSet())
954 );
Pier7b657162018-03-27 11:29:42 -0700955 totalSinks.addAll(
Piere99511d2018-04-19 16:47:06 +0200956 mcastUtils.getAffectedSinks(deviceId, mcastIp).values().stream()
Charles Chanba59dd62018-05-10 22:19:49 +0000957 .flatMap(Collection::stream).collect(Collectors.toSet())
958 );
Pier7b657162018-03-27 11:29:42 -0700959 });
Pier7b657162018-03-27 11:29:42 -0700960 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
Piere99511d2018-04-19 16:47:06 +0200961 Set<DeviceId> newEgressDevices = sinksToBeAdded.stream()
962 .map(ConnectPoint::deviceId).collect(Collectors.toSet());
963 newEgressDevices.addAll(recovered);
964 Set<DeviceId> copyNewEgressDevices = ImmutableSet.copyOf(newEgressDevices);
965 newEgressDevices = newEgressDevices.stream()
966 .filter(deviceId -> !deviceId.equals(ingressDevice)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +0100967 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, ingressDevice, newEgressDevices);
Pier7b657162018-03-27 11:29:42 -0700968 // if the source was originally in the new locations, add new sinks
Piere99511d2018-04-19 16:47:06 +0200969 if (copyNewEgressDevices.contains(ingressDevice)) {
Pier7b657162018-03-27 11:29:42 -0700970 sinksToBeAdded.stream()
971 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
972 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
973 }
Pier7b657162018-03-27 11:29:42 -0700974 // Construct a new path for each egress device
975 mcastTree.forEach((egressDevice, paths) -> {
Piere99511d2018-04-19 16:47:06 +0200976 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp, paths, source);
Pier7b657162018-03-27 11:29:42 -0700977 if (mcastPath.isPresent()) {
978 // Using recovery procedure
979 if (recovered.contains(egressDevice)) {
980 installPath(mcastIp, source, mcastPath.get());
981 } else {
982 // otherwise we need to threat as new sink
983 sinksToBeAdded.stream()
984 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
985 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
986 }
Pier7b657162018-03-27 11:29:42 -0700987 }
988 });
Pier7b657162018-03-27 11:29:42 -0700989 }
990
991 /**
Pier28164682018-04-17 15:50:43 +0200992 * Process all the sinks related to a mcast group and return
993 * the ones to be removed.
994 *
995 * @param mcastIp the group address
996 * @param prevsinks the previous sinks to be evaluated
997 * @param newSinks the new sinks to be evaluted
Piere99511d2018-04-19 16:47:06 +0200998 * @param source the source connect point
Pier28164682018-04-17 15:50:43 +0200999 * @return the set of the sinks to be removed
1000 */
1001 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
1002 Map<HostId, Set<ConnectPoint>> prevsinks,
Piere99511d2018-04-19 16:47:06 +02001003 Map<HostId, Set<ConnectPoint>> newSinks,
1004 ConnectPoint source) {
Pier28164682018-04-17 15:50:43 +02001005 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001006 log.debug("Processing sinks to be removed for Multicast group {}, source {}",
1007 mcastIp, source);
Pier28164682018-04-17 15:50:43 +02001008 prevsinks.forEach(((hostId, connectPoints) -> {
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001009 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001010 //in this case connect points are single homed sinks.
1011 //just found the difference btw previous and new sinks for this source.
1012 Set<ConnectPoint> difference = Sets.difference(connectPoints, newSinks.get(hostId));
1013 sinksToBeProcessed.addAll(difference);
1014 return;
1015 }
Pier28164682018-04-17 15:50:43 +02001016 // We have to check with the existing flows
1017 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001018 .filter(connectPoint -> isSinkForSource(mcastIp, connectPoint, source))
Pier28164682018-04-17 15:50:43 +02001019 .findFirst().orElse(null);
1020 if (sinkToBeProcessed != null) {
1021 // If the host has been removed or location has been removed
1022 if (!newSinks.containsKey(hostId) ||
1023 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
1024 sinksToBeProcessed.add(sinkToBeProcessed);
1025 }
1026 }
1027 }));
1028 // We have done, return the set
1029 return sinksToBeProcessed;
1030 }
1031
1032 /**
Pier7b657162018-03-27 11:29:42 -07001033 * Process new locations and return the set of sinks to be added
1034 * in the context of the recovery.
1035 *
Pier28164682018-04-17 15:50:43 +02001036 * @param newSinks the remaining sinks
1037 * @param prevSinks the previous sinks
Piere99511d2018-04-19 16:47:06 +02001038 * @param source the source connect point
Pier7b657162018-03-27 11:29:42 -07001039 * @return the set of the sinks to be processed
1040 */
Charles Chanba59dd62018-05-10 22:19:49 +00001041 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
1042 Map<HostId, Set<ConnectPoint>> newSinks,
Piere99511d2018-04-19 16:47:06 +02001043 Map<HostId, Set<ConnectPoint>> prevSinks,
1044 ConnectPoint source) {
Pier7b657162018-03-27 11:29:42 -07001045 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001046 log.debug("Processing sinks to be recovered for Multicast group {}, source {}",
1047 mcastIp, source);
Pier28164682018-04-17 15:50:43 +02001048 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -07001049 // If it has more than 1 locations
1050 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
1051 log.debug("Skip {} since sink {} has {} locations",
1052 mcastIp, hostId, connectPoints.size());
1053 return;
1054 }
Pier28164682018-04-17 15:50:43 +02001055 // If previously it had two locations, we need to recover it
1056 // Filter out if the remaining location is already served
1057 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +02001058 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001059 .filter(connectPoint -> !isSinkForSource(mcastIp, connectPoint, source))
Pier665b0fc2018-04-19 15:53:20 +02001060 .findFirst().orElse(null);
1061 if (sinkToBeProcessed != null) {
1062 sinksToBeProcessed.add(sinkToBeProcessed);
1063 }
Pier28164682018-04-17 15:50:43 +02001064 }
Pier7b657162018-03-27 11:29:42 -07001065 });
1066 return sinksToBeProcessed;
1067 }
1068
1069 /**
1070 * Process all the sinks related to a mcast group and return
1071 * the ones to be processed.
1072 *
1073 * @param source the source connect point
1074 * @param mcastIp the group address
1075 * @param sinks the sinks to be evaluated
1076 * @return the set of the sinks to be processed
1077 */
1078 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1079 Map<HostId, Set<ConnectPoint>> sinks) {
Pier7b657162018-03-27 11:29:42 -07001080 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
piereaddb182020-02-03 13:50:53 +01001081 log.debug("Processing sinks to be added for Multicast group {}, source {}",
1082 mcastIp, source);
Pier7b657162018-03-27 11:29:42 -07001083 sinks.forEach(((hostId, connectPoints) -> {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001084 //add all connect points that are not tied with any host
Shekhar Aryan27bbe2a2019-06-20 14:03:07 +00001085 if (Objects.equal(HostId.NONE, hostId)) {
Esin Karamanf1f46e32019-03-05 13:49:02 +00001086 sinksToBeProcessed.addAll(connectPoints);
1087 return;
1088 }
Pier7b657162018-03-27 11:29:42 -07001089 // If it has more than 2 locations
1090 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1091 log.debug("Skip {} since sink {} has {} locations",
1092 mcastIp, hostId, connectPoints.size());
1093 return;
1094 }
1095 // If it has one location, just use it
1096 if (connectPoints.size() == 1) {
Piere99511d2018-04-19 16:47:06 +02001097 sinksToBeProcessed.add(connectPoints.stream().findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001098 return;
1099 }
1100 // We prefer to reuse existing flows
1101 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001102 .filter(connectPoint -> {
1103 if (!isSinkForGroup(mcastIp, connectPoint, source)) {
1104 return false;
1105 }
1106 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1107 return false;
1108 }
1109 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001110 .filter(remaining -> !remaining.equals(connectPoint))
1111 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001112 // We are already serving the sink
1113 return !isSinkForSource(mcastIp, other, source);
1114 }).findFirst().orElse(null);
1115
Pier7b657162018-03-27 11:29:42 -07001116 if (sinkToBeProcessed != null) {
1117 sinksToBeProcessed.add(sinkToBeProcessed);
1118 return;
1119 }
1120 // Otherwise we prefer to reuse existing egresses
Piere99511d2018-04-19 16:47:06 +02001121 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS, source);
Pier7b657162018-03-27 11:29:42 -07001122 sinkToBeProcessed = connectPoints.stream()
Piere99511d2018-04-19 16:47:06 +02001123 .filter(connectPoint -> {
1124 if (!egresses.contains(connectPoint.deviceId())) {
1125 return false;
1126 }
1127 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1128 return false;
1129 }
1130 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001131 .filter(remaining -> !remaining.equals(connectPoint))
1132 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001133 return !isSinkForSource(mcastIp, other, source);
1134 }).findFirst().orElse(null);
Pier7b657162018-03-27 11:29:42 -07001135 if (sinkToBeProcessed != null) {
1136 sinksToBeProcessed.add(sinkToBeProcessed);
1137 return;
1138 }
1139 // Otherwise we prefer a location co-located with the source (if it exists)
1140 sinkToBeProcessed = connectPoints.stream()
1141 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1142 .findFirst().orElse(null);
1143 if (sinkToBeProcessed != null) {
1144 sinksToBeProcessed.add(sinkToBeProcessed);
1145 return;
1146 }
Piere99511d2018-04-19 16:47:06 +02001147 // Finally, we randomly pick a new location if it is reachable
1148 sinkToBeProcessed = connectPoints.stream()
1149 .filter(connectPoint -> {
1150 if (!isSinkReachable(mcastIp, connectPoint, source)) {
1151 return false;
1152 }
1153 ConnectPoint other = connectPoints.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001154 .filter(remaining -> !remaining.equals(connectPoint))
1155 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001156 return !isSinkForSource(mcastIp, other, source);
1157 }).findFirst().orElse(null);
1158 if (sinkToBeProcessed != null) {
1159 sinksToBeProcessed.add(sinkToBeProcessed);
1160 }
Pier7b657162018-03-27 11:29:42 -07001161 }));
Pier7b657162018-03-27 11:29:42 -07001162 return sinksToBeProcessed;
1163 }
1164
1165 /**
Pier1a7e0c02018-03-12 15:00:54 -07001166 * Utility method to remove all the ingress transit ports.
1167 *
1168 * @param mcastIp the group ip
Piere99511d2018-04-19 16:47:06 +02001169 * @param ingressDevices the ingress devices
1170 * @param sources the source connect points
Pier1a7e0c02018-03-12 15:00:54 -07001171 */
Piere99511d2018-04-19 16:47:06 +02001172 private void removeIngressTransitPorts(IpAddress mcastIp, Set<DeviceId> ingressDevices,
1173 Set<ConnectPoint> sources) {
1174 Map<ConnectPoint, Set<PortNumber>> ingressTransitPorts = Maps.newHashMap();
1175 sources.forEach(source -> {
1176 DeviceId ingressDevice = ingressDevices.stream()
1177 .filter(deviceId -> deviceId.equals(source.deviceId()))
1178 .findFirst().orElse(null);
1179 if (ingressDevice == null) {
1180 log.warn("Skip removeIngressTransitPorts - " +
1181 "Missing ingress for source {} and group {}",
1182 source, mcastIp);
1183 return;
Pier1a7e0c02018-03-12 15:00:54 -07001184 }
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001185 Set<PortNumber> ingressTransitPort = ingressTransitPort(mcastIp, ingressDevice, source);
1186 if (ingressTransitPort.isEmpty()) {
1187 log.warn("No transit ports to remove on device {}", ingressDevice);
1188 return;
1189 }
1190 ingressTransitPorts.put(source, ingressTransitPort);
Pier1a7e0c02018-03-12 15:00:54 -07001191 });
Piere99511d2018-04-19 16:47:06 +02001192 ingressTransitPorts.forEach((source, ports) -> ports.forEach(ingressTransitPort -> {
1193 DeviceId ingressDevice = ingressDevices.stream()
Charles Chanba59dd62018-05-10 22:19:49 +00001194 .filter(deviceId -> deviceId.equals(source.deviceId()))
1195 .findFirst().orElse(null);
Piere99511d2018-04-19 16:47:06 +02001196 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
1197 mcastIp, mcastUtils.assignedVlan(source));
1198 if (isLast) {
1199 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
1200 }
1201 }));
Pier1a7e0c02018-03-12 15:00:54 -07001202 }
1203
1204 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001205 * Adds a port to given multicast group on given device. This involves the
1206 * update of L3 multicast group and multicast routing table entry.
1207 *
1208 * @param deviceId device ID
1209 * @param port port to be added
1210 * @param mcastIp multicast group
1211 * @param assignedVlan assigned VLAN ID
1212 */
Charles Chanba59dd62018-05-10 22:19:49 +00001213 private void addPortToDevice(DeviceId deviceId, PortNumber port,
1214 IpAddress mcastIp, VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001215 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001216 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001217 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001218 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001219 // First time someone request this mcast group via this device
1220 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001221 // New nextObj
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001222 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1223 log.debug("Passing 0 as nextId for unconfigured device {}", deviceId);
1224 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1225 portBuilder.build(), 0).add();
1226 } else {
1227 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
1228 portBuilder.build(), null).add();
1229 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001230 // Store the new port
1231 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001232 // Create, store and apply the new nextObj and fwdObj
1233 ObjectiveContext context = new DefaultObjectiveContext(
1234 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1235 mcastIp, deviceId, port.toLong(), assignedVlan),
1236 (objective, error) -> {
1237 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1238 mcastIp, deviceId, port.toLong(), assignedVlan, error);
1239 srManager.invalidateNextObj(objective.id());
1240 });
1241 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1242 newNextObj.id()).add(context);
1243 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1244 log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
1245 } else {
1246 srManager.flowObjectiveService.next(deviceId, newNextObj);
1247 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1248 }
Charles Chanc91c8782016-03-30 17:54:24 -07001249 } else {
1250 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001251 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001252 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001253 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001254 if (existingPorts.contains(port)) {
piereaddb182020-02-03 13:50:53 +01001255 log.debug("Port {}/{} already exists for {}. Abort", deviceId, port, mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -07001256 return;
1257 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001258 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001259 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001260 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001261 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001262 portBuilder.build(), nextObj.id()).addToExisting();
1263 // Store the final next objective and send only the difference to the driver
1264 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1265 // Add just the new port
1266 portBuilder = ImmutableSet.builder();
1267 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001268 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001269 portBuilder.build(), nextObj.id()).addToExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001270 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1271 log.debug("skip next flowobjective update for device: {}", deviceId);
1272 } else {
1273 // no need to update the flow here since we have updated the nextobjective/group
1274 // the existing flow will keep pointing to the updated nextobj
1275 srManager.flowObjectiveService.next(deviceId, newNextObj);
1276 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001277 }
Charles Chanc91c8782016-03-30 17:54:24 -07001278 }
1279
1280 /**
1281 * Removes a port from given multicast group on given device.
1282 * This involves the update of L3 multicast group and multicast routing
1283 * table entry.
1284 *
1285 * @param deviceId device ID
1286 * @param port port to be added
1287 * @param mcastIp multicast group
1288 * @param assignedVlan assigned VLAN ID
1289 * @return true if this is the last sink on this device
1290 */
Charles Chanba59dd62018-05-10 22:19:49 +00001291 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
1292 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001293 McastStoreKey mcastStoreKey =
Piere99511d2018-04-19 16:47:06 +02001294 new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chanc91c8782016-03-30 17:54:24 -07001295 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001296 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001297 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001298 }
Charles Chan72779502016-04-23 17:36:10 -07001299 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001300 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001301 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001302 if (!existingPorts.contains(port)) {
Piere99511d2018-04-19 16:47:06 +02001303 if (!existingPorts.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001304 log.debug("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
Piere99511d2018-04-19 16:47:06 +02001305 return false;
1306 }
1307 return true;
Charles Chanc91c8782016-03-30 17:54:24 -07001308 }
1309 // Copy and modify the ImmutableSet
1310 existingPorts = Sets.newHashSet(existingPorts);
1311 existingPorts.remove(port);
Charles Chanc91c8782016-03-30 17:54:24 -07001312 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001313 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001314 ForwardingObjective fwdObj;
1315 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001316 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001317 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1318 mcastIp, deviceId, port.toLong(), assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001319 (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001320 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001321 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001322 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1323 log.debug("skip forward flowobjective removal for device: {}", deviceId);
1324 } else {
1325 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1326 }
Charles Chan72779502016-04-23 17:36:10 -07001327 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001328 } else {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001329 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001330 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001331 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan72779502016-04-23 17:36:10 -07001332 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001333 // Let's modify the next objective removing the bucket
1334 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001335 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001336 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1337 log.debug("skip next flowobjective update for device: {}", deviceId);
1338 } else {
1339 // no need to update the flow here since we have updated the next objective + group
1340 // the existing flow will keep pointing to the updated nextobj
1341 srManager.flowObjectiveService.next(deviceId, newNextObj);
1342 }
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001343 }
Charles Chanc91c8782016-03-30 17:54:24 -07001344 return existingPorts.isEmpty();
1345 }
1346
Charles Chan72779502016-04-23 17:36:10 -07001347 /**
1348 * Removes entire group on given device.
1349 *
1350 * @param deviceId device ID
1351 * @param mcastIp multicast group to be removed
1352 * @param assignedVlan assigned VLAN ID
1353 */
Charles Chanba59dd62018-05-10 22:19:49 +00001354 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
1355 VlanId assignedVlan) {
Piere99511d2018-04-19 16:47:06 +02001356 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
Charles Chan72779502016-04-23 17:36:10 -07001357 // This device is not serving this multicast group
1358 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
piereaddb182020-02-03 13:50:53 +01001359 log.debug("{} is not serving {}. Abort.", deviceId, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -07001360 return;
1361 }
1362 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chan72779502016-04-23 17:36:10 -07001363 ObjectiveContext context = new DefaultObjectiveContext(
1364 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1365 mcastIp, deviceId, assignedVlan),
Piere99511d2018-04-19 16:47:06 +02001366 (objective, error) -> log.warn("Failed to remove {} on {}, vlan {}: {}",
Charles Chan72779502016-04-23 17:36:10 -07001367 mcastIp, deviceId, assignedVlan, error));
Vignesh Ethiraj75790122019-08-26 12:18:42 +00001368 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
1369 log.debug("skip flow changes on unconfigured device: {}", deviceId);
1370 } else {
1371 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
1372 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1373 }
Charles Chan72779502016-04-23 17:36:10 -07001374 mcastNextObjStore.remove(mcastStoreKey);
Charles Chan72779502016-04-23 17:36:10 -07001375 }
1376
Pier Luigi580fd8a2018-01-16 10:47:50 +01001377 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
Pier Luigi580fd8a2018-01-16 10:47:50 +01001378 List<Link> links = mcastPath.links();
kezhiyong168fbba2018-12-03 16:14:29 +08001379 if (links.isEmpty()) {
1380 log.warn("There is no link that can be used. Stopping installation.");
1381 return;
1382 }
Pier1a7e0c02018-03-12 15:00:54 -07001383 // Setup new ingress mcast role
Piere99511d2018-04-19 16:47:06 +02001384 mcastRoleStore.put(new McastRoleStoreKey(mcastIp, links.get(0).src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001385 INGRESS);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001386 // For each link, modify the next on the source device adding the src port
1387 // and a new filter objective on the destination port
1388 links.forEach(link -> {
1389 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001390 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001391 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(link.dst(),
1392 mcastUtils.assignedVlan(null), mcastIp.isIp4());
1393 addFilterToDevice(mcastFilterObjStoreKey, mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001394 });
Pier1a7e0c02018-03-12 15:00:54 -07001395 // Setup mcast role for the transit
1396 links.stream()
1397 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
Piere99511d2018-04-19 16:47:06 +02001398 .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source),
Pier1a7e0c02018-03-12 15:00:54 -07001399 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001400 }
1401
Charles Chanc91c8782016-03-30 17:54:24 -07001402 /**
Pier1f87aca2018-03-14 16:47:32 -07001403 * Go through all the paths, looking for shared links to be used
1404 * in the final path computation.
1405 *
1406 * @param egresses egress devices
1407 * @param availablePaths all the available paths towards the egress
1408 * @return shared links between egress devices
1409 */
Charles Chanba59dd62018-05-10 22:19:49 +00001410 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1411 Map<DeviceId, List<Path>> availablePaths) {
Pier1f87aca2018-03-14 16:47:32 -07001412 int minLength = Integer.MAX_VALUE;
1413 int length;
Pier1f87aca2018-03-14 16:47:32 -07001414 List<Path> currentPaths;
1415 // Verify the source can still reach all the egresses
1416 for (DeviceId egress : egresses) {
1417 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001418 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001419 currentPaths = availablePaths.get(egress);
1420 if (currentPaths.isEmpty()) {
1421 continue;
1422 }
Piere99511d2018-04-19 16:47:06 +02001423 // Get the length of the first one available, update the min length
Pier1f87aca2018-03-14 16:47:32 -07001424 length = currentPaths.get(0).links().size();
1425 if (length < minLength) {
1426 minLength = length;
1427 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001428 }
Pier1f87aca2018-03-14 16:47:32 -07001429 // If there are no paths
1430 if (minLength == Integer.MAX_VALUE) {
1431 return Collections.emptySet();
1432 }
Pier1f87aca2018-03-14 16:47:32 -07001433 int index = 0;
Pier1f87aca2018-03-14 16:47:32 -07001434 Set<Link> sharedLinks = Sets.newHashSet();
1435 Set<Link> currentSharedLinks;
1436 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001437 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001438 // Let's find out the shared links
1439 while (index < minLength) {
1440 // Initialize the intersection with the paths related to the first egress
Piere99511d2018-04-19 16:47:06 +02001441 currentPaths = availablePaths.get(egresses.stream().findFirst().orElse(null));
Pier1f87aca2018-03-14 16:47:32 -07001442 currentSharedLinks = Sets.newHashSet();
1443 // Iterate over the paths and take the "index" links
1444 for (Path path : currentPaths) {
1445 currentSharedLinks.add(path.links().get(index));
1446 }
1447 // Iterate over the remaining egress
1448 for (DeviceId egress : egresses) {
1449 // Iterate over the paths and take the "index" links
1450 currentLinks = Sets.newHashSet();
1451 for (Path path : availablePaths.get(egress)) {
1452 currentLinks.add(path.links().get(index));
1453 }
1454 // Do intersection
1455 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1456 // If there are no shared paths exit and record the device to remove
1457 // we have to retry with a subset of sinks
1458 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001459 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001460 index = minLength;
1461 break;
1462 }
1463 }
1464 sharedLinks.addAll(currentSharedLinks);
1465 index++;
1466 }
Piere99511d2018-04-19 16:47:06 +02001467 // If the shared links is empty and there are egress let's retry another time with less sinks,
1468 // we can still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001469 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1470 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001471 sharedLinks = exploreMcastTree(egresses, availablePaths);
1472 }
1473 return sharedLinks;
1474 }
1475
1476 /**
1477 * Build Mcast tree having as root the given source and as leaves the given egress points.
1478 *
piereaddb182020-02-03 13:50:53 +01001479 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001480 * @param source source of the tree
1481 * @param sinks leaves of the tree
1482 * @return the computed Mcast tree
1483 */
piereaddb182020-02-03 13:50:53 +01001484 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(IpAddress mcastIp,
1485 DeviceId source,
Charles Chanba59dd62018-05-10 22:19:49 +00001486 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001487 // Get the egress devices, remove source from the egress if present
Piere99511d2018-04-19 16:47:06 +02001488 Set<DeviceId> egresses = sinks.stream().map(ConnectPoint::deviceId)
1489 .filter(deviceId -> !deviceId.equals(source)).collect(Collectors.toSet());
piereaddb182020-02-03 13:50:53 +01001490 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001491 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001492 // We need to put back the source if it was originally present
1493 sinks.forEach(sink -> {
1494 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1495 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1496 });
Pier1f87aca2018-03-14 16:47:32 -07001497 return finalTree;
1498 }
1499
1500 /**
1501 * Build Mcast tree having as root the given source and as leaves the given egress.
1502 *
piereaddb182020-02-03 13:50:53 +01001503 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001504 * @param source source of the tree
1505 * @param egresses leaves of the tree
1506 * @return the computed Mcast tree
1507 */
piereaddb182020-02-03 13:50:53 +01001508 private Map<DeviceId, List<Path>> computeMcastTree(IpAddress mcastIp,
1509 DeviceId source,
Pier1f87aca2018-03-14 16:47:32 -07001510 Set<DeviceId> egresses) {
piereaddb182020-02-03 13:50:53 +01001511 log.debug("Computing tree for Multicast group {}, source {} and leafs {}",
1512 mcastIp, source, egresses);
Pier1f87aca2018-03-14 16:47:32 -07001513 // Pre-compute all the paths
1514 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
Pier1f87aca2018-03-14 16:47:32 -07001515 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1516 Collections.emptySet())));
1517 // Explore the topology looking for shared links amongst the egresses
1518 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
Pier1f87aca2018-03-14 16:47:32 -07001519 // Build the final paths enforcing the shared links between egress devices
Piere99511d2018-04-19 16:47:06 +02001520 availablePaths.clear();
Pier1f87aca2018-03-14 16:47:32 -07001521 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1522 linksToEnforce)));
1523 return availablePaths;
1524 }
1525
1526 /**
1527 * Gets path from src to dst computed using the custom link weigher.
1528 *
1529 * @param src source device ID
1530 * @param dst destination device ID
1531 * @return list of paths from src to dst
1532 */
1533 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
Pier1f87aca2018-03-14 16:47:32 -07001534 final Topology currentTopology = topologyService.currentTopology();
Pier1f87aca2018-03-14 16:47:32 -07001535 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
Piere99511d2018-04-19 16:47:06 +02001536 List<Path> allPaths = Lists.newArrayList(topologyService.getPaths(currentTopology, src, dst, linkWeigher));
piereaddb182020-02-03 13:50:53 +01001537 log.trace("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Pier1f87aca2018-03-14 16:47:32 -07001538 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001539 }
1540
Charles Chanc91c8782016-03-30 17:54:24 -07001541 /**
1542 * Gets a path from src to dst.
1543 * If a path was allocated before, returns the allocated path.
1544 * Otherwise, randomly pick one from available paths.
1545 *
1546 * @param src source device ID
1547 * @param dst destination device ID
1548 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001549 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001550 * @return an optional path from src to dst
1551 */
Piere99511d2018-04-19 16:47:06 +02001552 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp,
1553 List<Path> allPaths, ConnectPoint source) {
Pier1f87aca2018-03-14 16:47:32 -07001554 if (allPaths == null) {
1555 allPaths = getPaths(src, dst, Collections.emptySet());
1556 }
Charles Chanc91c8782016-03-30 17:54:24 -07001557 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001558 return Optional.empty();
1559 }
Piere99511d2018-04-19 16:47:06 +02001560 // Create a map index of suitability-to-list of paths. For example
Pier Luigi91573e12018-01-23 16:06:38 +01001561 // a path in the list associated to the index 1 shares only the
1562 // first hop and it is less suitable of a path belonging to the index
1563 // 2 that shares leaf-spine.
1564 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
Pier Luigi91573e12018-01-23 16:06:38 +01001565 int nhop;
1566 McastStoreKey mcastStoreKey;
Pier Luigi91573e12018-01-23 16:06:38 +01001567 PortNumber srcPort;
1568 Set<PortNumber> existingPorts;
1569 NextObjective nextObj;
Pier Luigi91573e12018-01-23 16:06:38 +01001570 for (Path path : allPaths) {
Pier Luigi91573e12018-01-23 16:06:38 +01001571 if (!src.equals(path.links().get(0).src().deviceId())) {
1572 continue;
1573 }
1574 nhop = 0;
1575 // Iterate over the links
Piere99511d2018-04-19 16:47:06 +02001576 for (Link hop : path.links()) {
1577 VlanId assignedVlan = mcastUtils.assignedVlan(hop.src().deviceId().equals(src) ?
1578 source : null);
1579 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId(), assignedVlan);
1580 // It does not exist in the store, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001581 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Piere99511d2018-04-19 16:47:06 +02001582 continue;
Charles Chanc91c8782016-03-30 17:54:24 -07001583 }
Pier Luigi91573e12018-01-23 16:06:38 +01001584 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001585 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001586 srcPort = hop.src().port();
Piere99511d2018-04-19 16:47:06 +02001587 // the src port is not used as output, go to the next link
Pier Luigi91573e12018-01-23 16:06:38 +01001588 if (!existingPorts.contains(srcPort)) {
Piere99511d2018-04-19 16:47:06 +02001589 continue;
Pier Luigi91573e12018-01-23 16:06:38 +01001590 }
1591 nhop++;
1592 }
1593 // n_hop defines the index
1594 if (nhop > 0) {
1595 eligiblePaths.compute(nhop, (index, paths) -> {
1596 paths = paths == null ? Lists.newArrayList() : paths;
1597 paths.add(path);
1598 return paths;
1599 });
Charles Chanc91c8782016-03-30 17:54:24 -07001600 }
1601 }
Pier Luigi91573e12018-01-23 16:06:38 +01001602 if (eligiblePaths.isEmpty()) {
piereaddb182020-02-03 13:50:53 +01001603 log.trace("No eligiblePath(s) found from {} to {}", src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001604 Collections.shuffle(allPaths);
1605 return allPaths.stream().findFirst();
1606 }
Pier Luigi91573e12018-01-23 16:06:38 +01001607 // Let's take the best ones
Piere99511d2018-04-19 16:47:06 +02001608 Integer bestIndex = eligiblePaths.keySet().stream()
1609 .sorted(Comparator.reverseOrder()).findFirst().orElse(null);
Pier Luigi91573e12018-01-23 16:06:38 +01001610 List<Path> bestPaths = eligiblePaths.get(bestIndex);
piereaddb182020-02-03 13:50:53 +01001611 log.trace("{} eligiblePath(s) found from {} to {}",
Pier Luigi91573e12018-01-23 16:06:38 +01001612 bestPaths.size(), src, dst);
Pier Luigi91573e12018-01-23 16:06:38 +01001613 Collections.shuffle(bestPaths);
1614 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001615 }
1616
1617 /**
Piere99511d2018-04-19 16:47:06 +02001618 * Gets device(s) of given role and of given source in given multicast tree.
1619 *
1620 * @param mcastIp multicast IP
1621 * @param role multicast role
1622 * @param source source connect point
1623 * @return set of device ID or empty set if not found
1624 */
1625 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role, ConnectPoint source) {
1626 return mcastRoleStore.entrySet().stream()
1627 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
Charles Chanba59dd62018-05-10 22:19:49 +00001628 entry.getKey().source().equals(source) &&
1629 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001630 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1631 }
1632
1633 /**
Charles Chan72779502016-04-23 17:36:10 -07001634 * Gets device(s) of given role in given multicast group.
1635 *
1636 * @param mcastIp multicast IP
1637 * @param role multicast role
1638 * @return set of device ID or empty set if not found
1639 */
1640 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1641 return mcastRoleStore.entrySet().stream()
1642 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1643 entry.getValue().value() == role)
Piere99511d2018-04-19 16:47:06 +02001644 .map(Entry::getKey).map(McastRoleStoreKey::deviceId).collect(Collectors.toSet());
1645 }
1646
1647 /**
1648 * Gets source(s) of given role, given device in given multicast group.
1649 *
1650 * @param mcastIp multicast IP
1651 * @param deviceId device id
1652 * @param role multicast role
1653 * @return set of device ID or empty set if not found
1654 */
1655 private Set<ConnectPoint> getSources(IpAddress mcastIp, DeviceId deviceId, McastRole role) {
1656 return mcastRoleStore.entrySet().stream()
1657 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1658 entry.getKey().deviceId().equals(deviceId) && entry.getValue().value() == role)
1659 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
1660 }
1661
1662 /**
1663 * Gets source(s) of given multicast group.
1664 *
1665 * @param mcastIp multicast IP
1666 * @return set of device ID or empty set if not found
1667 */
1668 private Set<ConnectPoint> getSources(IpAddress mcastIp) {
1669 return mcastRoleStore.entrySet().stream()
1670 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp))
1671 .map(Entry::getKey).map(McastRoleStoreKey::source).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001672 }
1673
1674 /**
1675 * Gets groups which is affected by the link down event.
1676 *
1677 * @param link link going down
1678 * @return a set of multicast IpAddress
1679 */
1680 private Set<IpAddress> getAffectedGroups(Link link) {
1681 DeviceId deviceId = link.src().deviceId();
1682 PortNumber port = link.src().port();
1683 return mcastNextObjStore.entrySet().stream()
1684 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Piere99511d2018-04-19 16:47:06 +02001685 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
1686 .map(Entry::getKey).map(McastStoreKey::mcastIp).collect(Collectors.toSet());
Charles Chan72779502016-04-23 17:36:10 -07001687 }
1688
1689 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001690 * Gets groups which are affected by the device down event.
1691 *
1692 * @param deviceId device going down
1693 * @return a set of multicast IpAddress
1694 */
1695 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1696 return mcastNextObjStore.entrySet().stream()
1697 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001698 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001699 .collect(Collectors.toSet());
1700 }
1701
1702 /**
Charles Chan72779502016-04-23 17:36:10 -07001703 * Gets the spine-facing port on ingress device of given multicast group.
1704 *
1705 * @param mcastIp multicast IP
Piere99511d2018-04-19 16:47:06 +02001706 * @param ingressDevice the ingress device
1707 * @param source the source connect point
Charles Chan72779502016-04-23 17:36:10 -07001708 * @return spine-facing port on ingress device
1709 */
Charles Chanba59dd62018-05-10 22:19:49 +00001710 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp, DeviceId ingressDevice,
1711 ConnectPoint source) {
Pier1a7e0c02018-03-12 15:00:54 -07001712 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001713 if (ingressDevice != null) {
Andrea Campanella5b4cd652018-06-05 14:19:21 +02001714 Versioned<NextObjective> nextObjVers = mcastNextObjStore.get(new McastStoreKey(mcastIp, ingressDevice,
1715 mcastUtils.assignedVlan(source)));
1716 if (nextObjVers == null) {
1717 log.warn("Absent next objective for {}", new McastStoreKey(mcastIp, ingressDevice,
1718 mcastUtils.assignedVlan(source)));
1719 return portBuilder.build();
1720 }
1721 NextObjective nextObj = nextObjVers.value();
Pier7b657162018-03-27 11:29:42 -07001722 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001723 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001724 for (PortNumber port : ports) {
1725 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001726 if (srManager.deviceConfiguration() != null &&
1727 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan8d316332018-06-19 20:31:57 -07001728 (srManager.xconnectService == null ||
1729 !srManager.xconnectService.hasXconnect(new ConnectPoint(ingressDevice, port)))) {
Pier1a7e0c02018-03-12 15:00:54 -07001730 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001731 }
1732 }
1733 }
Pier1a7e0c02018-03-12 15:00:54 -07001734 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001735 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001736
1737 /**
Pier28164682018-04-17 15:50:43 +02001738 * Verify if a given connect point is sink for this group.
1739 *
1740 * @param mcastIp group address
1741 * @param connectPoint connect point to be verified
Piere99511d2018-04-19 16:47:06 +02001742 * @param source source connect point
Pier28164682018-04-17 15:50:43 +02001743 * @return true if the connect point is sink of the group
1744 */
Charles Chanba59dd62018-05-10 22:19:49 +00001745 private boolean isSinkForGroup(IpAddress mcastIp, ConnectPoint connectPoint,
1746 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001747 VlanId assignedVlan = mcastUtils.assignedVlan(connectPoint.deviceId().equals(source.deviceId()) ?
1748 source : null);
1749 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId(), assignedVlan);
Pier28164682018-04-17 15:50:43 +02001750 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1751 return false;
1752 }
Pier28164682018-04-17 15:50:43 +02001753 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1754 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1755 }
1756
1757 /**
Piere99511d2018-04-19 16:47:06 +02001758 * Verify if a given connect point is sink for this group and for this source.
1759 *
1760 * @param mcastIp group address
1761 * @param connectPoint connect point to be verified
1762 * @param source source connect point
1763 * @return true if the connect point is sink of the group
1764 */
Charles Chanba59dd62018-05-10 22:19:49 +00001765 private boolean isSinkForSource(IpAddress mcastIp, ConnectPoint connectPoint,
1766 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001767 boolean isSink = isSinkForGroup(mcastIp, connectPoint, source);
1768 DeviceId device;
1769 if (connectPoint.deviceId().equals(source.deviceId())) {
1770 device = getDevice(mcastIp, INGRESS, source).stream()
1771 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1772 .findFirst().orElse(null);
1773 } else {
1774 device = getDevice(mcastIp, EGRESS, source).stream()
1775 .filter(deviceId -> deviceId.equals(connectPoint.deviceId()))
1776 .findFirst().orElse(null);
1777 }
1778 return isSink && device != null;
1779 }
1780
1781 /**
1782 * Verify if a sink is reachable from this source.
1783 *
1784 * @param mcastIp group address
1785 * @param sink connect point to be verified
1786 * @param source source connect point
1787 * @return true if the connect point is reachable from the source
1788 */
Charles Chanba59dd62018-05-10 22:19:49 +00001789 private boolean isSinkReachable(IpAddress mcastIp, ConnectPoint sink,
1790 ConnectPoint source) {
Piere99511d2018-04-19 16:47:06 +02001791 return sink.deviceId().equals(source.deviceId()) ||
1792 getPath(source.deviceId(), sink.deviceId(), mcastIp, null, source).isPresent();
1793 }
1794
1795 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001796 * Updates filtering objective for given device and port.
1797 * It is called in general when the mcast config has been
1798 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001799 *
1800 * @param deviceId device ID
1801 * @param portNum ingress port number
1802 * @param vlanId assigned VLAN ID
1803 * @param install true to add, false to remove
1804 */
Charles Chanba59dd62018-05-10 22:19:49 +00001805 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1806 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001807 lastMcastChange = Instant.now();
1808 mcastLock();
1809 try {
Piere99511d2018-04-19 16:47:06 +02001810 // Iterates over the route and updates properly the filtering objective on the source device.
Pier Luigi35dab3f2018-01-25 16:16:02 +01001811 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001812 log.debug("Update filter for {}", mcastRoute.group());
Pierdb27b8d2018-04-17 16:29:56 +02001813 if (!mcastUtils.isLeader(mcastRoute.group())) {
1814 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1815 return;
1816 }
Piere99511d2018-04-19 16:47:06 +02001817 // Get the sources and for each one update properly the filtering objectives
1818 Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
1819 sources.forEach(source -> {
1820 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1821 if (install) {
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001822 McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
1823 vlanId, mcastRoute.group().isIp4());
1824 addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
Piere99511d2018-04-19 16:47:06 +02001825 } else {
1826 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
1827 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001828 }
Piere99511d2018-04-19 16:47:06 +02001829 });
Pier Luigi35dab3f2018-01-25 16:16:02 +01001830 });
1831 } finally {
1832 mcastUnlock();
1833 }
1834 }
1835
1836 /**
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001837 * Add filtering to the device if needed.
1838 *
1839 * @param filterObjStoreKey the filtering obj key
1840 * @param mcastIp the multicast group
1841 * @param mcastRole the multicast role
1842 */
1843 private void addFilterToDevice(McastFilteringObjStoreKey filterObjStoreKey,
1844 IpAddress mcastIp,
1845 McastRole mcastRole) {
1846 if (!containsFilterInTheDevice(filterObjStoreKey)) {
1847 // if this is the first sink for this group/device
1848 // match additionally on mac
1849 log.debug("Filtering not available for device {}, vlan {} and {}",
1850 filterObjStoreKey.ingressCP().deviceId(), filterObjStoreKey.vlanId(),
1851 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1852 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1853 filterObjStoreKey.ingressCP().port(),
1854 filterObjStoreKey.vlanId(), mcastIp,
1855 mcastRole, true);
1856 mcastFilteringObjStore.add(filterObjStoreKey);
1857 } else if (!mcastFilteringObjStore.contains(filterObjStoreKey)) {
1858 // match only vlan
1859 log.debug("Filtering not available for connect point {}, vlan {} and {}",
1860 filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
1861 filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
1862 mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
1863 filterObjStoreKey.ingressCP().port(),
1864 filterObjStoreKey.vlanId(), mcastIp,
1865 mcastRole, false);
1866 mcastFilteringObjStore.add(filterObjStoreKey);
1867 } else {
1868 // do nothing
1869 log.debug("Filtering already present. Abort");
1870 }
1871 }
1872
1873 /**
1874 * Verify if there are related filtering obj in the device.
1875 *
1876 * @param filteringKey the filtering obj key
1877 * @return true if related filtering obj are found
1878 */
1879 private boolean containsFilterInTheDevice(McastFilteringObjStoreKey filteringKey) {
1880 // check if filters are already added on the device
1881 McastFilteringObjStoreKey key = mcastFilteringObjStore.stream()
1882 .filter(mcastFilteringKey ->
1883 mcastFilteringKey.ingressCP().deviceId().equals(filteringKey.ingressCP().deviceId())
1884 && mcastFilteringKey.isIpv4() == filteringKey.isIpv4()
1885 && mcastFilteringKey.vlanId().equals(filteringKey.vlanId())
1886 ).findFirst().orElse(null);
1887 // we are interested to filt obj on the same device, same vlan and same ip type
1888 return key != null;
1889 }
1890
1891 /**
1892 * Update the filtering objective store upon device failure.
1893 *
1894 * @param affectedDevice the affected device
1895 */
1896 private void updateFilterObjStoreByDevice(DeviceId affectedDevice) {
1897 // purge the related filter objective key
1898 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1899 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1900 McastFilteringObjStoreKey filterKey;
1901 while (filterIterator.hasNext()) {
1902 filterKey = filterIterator.next();
1903 if (filterKey.ingressCP().deviceId().equals(affectedDevice)) {
1904 mcastFilteringObjStore.remove(filterKey);
1905 }
1906 }
1907 }
1908
1909 /**
1910 * Update the filtering objective store upon port failure.
1911 *
1912 * @param affectedPort the affected port
1913 */
1914 private void updateFilterObjStoreByPort(ConnectPoint affectedPort) {
1915 // purge the related filter objective key
1916 Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
1917 Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
1918 McastFilteringObjStoreKey filterKey;
1919 while (filterIterator.hasNext()) {
1920 filterKey = filterIterator.next();
1921 if (filterKey.ingressCP().equals(affectedPort)) {
1922 mcastFilteringObjStore.remove(filterKey);
1923 }
1924 }
1925 }
1926
1927 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001928 * Performs bucket verification operation for all mcast groups in the devices.
1929 * Firstly, it verifies that mcast is stable before trying verification operation.
1930 * Verification consists in creating new nexts with VERIFY operation. Actually,
1931 * the operation is totally delegated to the driver.
1932 */
Piere99511d2018-04-19 16:47:06 +02001933 private final class McastBucketCorrector implements Runnable {
Harshada Chaundkar9204f312019-07-02 16:01:24 +00001934 // Internal params
pierc32ef422020-01-27 17:45:03 +01001935 private static final int MAX_VERIFY_ON_FLIGHT = 10;
1936 private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
1937 // Define the context used for the back pressure mechanism
1938 private final ObjectiveContext context = new DefaultObjectiveContext(
1939 (objective) -> {
1940 synchronized (verifyOnFlight) {
1941 verifyOnFlight.decrementAndGet();
1942 verifyOnFlight.notify();
1943 }
1944 },
1945 (objective, error) -> {
1946 synchronized (verifyOnFlight) {
1947 verifyOnFlight.decrementAndGet();
1948 verifyOnFlight.notify();
1949 }
1950 });
1951
Pier Luigi35dab3f2018-01-25 16:16:02 +01001952 @Override
1953 public void run() {
pierc32ef422020-01-27 17:45:03 +01001954 if (!isMcastStable() || wasBktCorrRunning()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001955 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001956 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001957 mcastLock();
1958 try {
1959 // Iterates over the routes and verify the related next objectives
pierc32ef422020-01-27 17:45:03 +01001960 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
1961 IpAddress mcastIp = mcastRoute.group();
1962 log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
1963 // Verify leadership on the operation
1964 if (!mcastUtils.isLeader(mcastIp)) {
1965 log.trace("Skip {} due to lack of leadership", mcastIp);
1966 continue;
1967 }
1968 // Get sources and sinks from Mcast Route Service and warn about errors
1969 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1970 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1971 .flatMap(Collection::stream).collect(Collectors.toSet());
1972 // Do not proceed if sources of this group are missing
1973 if (sources.isEmpty()) {
1974 if (!sinks.isEmpty()) {
1975 log.warn("Unable to run buckets corrector. " +
1976 "Missing source {} for group {}", sources, mcastIp);
Piere99511d2018-04-19 16:47:06 +02001977 }
pierc32ef422020-01-27 17:45:03 +01001978 continue;
1979 }
1980 // For each group we get current information in the store
1981 // and issue a check of the next objectives in place
1982 Set<McastStoreKey> processedKeys = Sets.newHashSet();
1983 for (ConnectPoint source : sources) {
1984 Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
1985 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
1986 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
1987 // Do not proceed if ingress devices are missing
1988 if (ingressDevices.isEmpty()) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001989 if (!sinks.isEmpty()) {
1990 log.warn("Unable to run buckets corrector. " +
pierc32ef422020-01-27 17:45:03 +01001991 "Missing ingress {} for source {} and for group {}",
1992 ingressDevices, source, mcastIp);
Pier Luigi92e69be2018-03-02 12:53:37 +01001993 }
pierc32ef422020-01-27 17:45:03 +01001994 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01001995 }
pierc32ef422020-01-27 17:45:03 +01001996 // Create the set of the devices to be processed
1997 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1998 if (!ingressDevices.isEmpty()) {
1999 devicesBuilder.addAll(ingressDevices);
2000 }
2001 if (!transitDevices.isEmpty()) {
2002 devicesBuilder.addAll(transitDevices);
2003 }
2004 if (!egressDevices.isEmpty()) {
2005 devicesBuilder.addAll(egressDevices);
2006 }
2007 Set<DeviceId> devicesToProcess = devicesBuilder.build();
2008 for (DeviceId deviceId : devicesToProcess) {
2009 if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
2010 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
Harshada Chaundkar9204f312019-07-02 16:01:24 +00002011 continue;
Pier Luigi35dab3f2018-01-25 16:16:02 +01002012 }
pierc32ef422020-01-27 17:45:03 +01002013 synchronized (verifyOnFlight) {
2014 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
2015 verifyOnFlight.wait();
Vignesh Ethiraj75790122019-08-26 12:18:42 +00002016 }
pierc32ef422020-01-27 17:45:03 +01002017 }
2018 VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
2019 source : null);
2020 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
2021 // Check if we already processed this next - trees merge at some point
2022 if (processedKeys.contains(currentKey)) {
2023 continue;
2024 }
2025 // Verify the nextobjective or skip to next device
2026 if (mcastNextObjStore.containsKey(currentKey)) {
2027 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
2028 // Rebuild the next objective using assigned vlan
2029 currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
2030 mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
2031 // Send to the flowobjective service
2032 srManager.flowObjectiveService.next(deviceId, currentNext);
2033 verifyOnFlight.incrementAndGet();
2034 log.trace("Verify on flight {}", verifyOnFlight);
2035 processedKeys.add(currentKey);
2036 } else {
2037 log.warn("Unable to run buckets corrector. " +
2038 "Missing next for {}, for source {} and for group {}",
2039 deviceId, source, mcastIp);
2040 }
2041 }
2042 }
2043 }
2044 } catch (InterruptedException e) {
2045 log.warn("BktCorr has been interrupted");
Pier Luigi35dab3f2018-01-25 16:16:02 +01002046 } finally {
pierc32ef422020-01-27 17:45:03 +01002047 lastBktCorrExecution = Instant.now();
Pier Luigi35dab3f2018-01-25 16:16:02 +01002048 mcastUnlock();
2049 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01002050 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07002051 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01002052
Piere99511d2018-04-19 16:47:06 +02002053 /**
2054 * Returns the associated next ids to the mcast groups or to the single
2055 * group if mcastIp is present.
2056 *
2057 * @param mcastIp the group ip
2058 * @return the mapping mcastIp-device to next id
2059 */
Charles Chan0b1dd7e2018-08-19 19:21:46 -07002060 public Map<McastStoreKey, Integer> getNextIds(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01002061 if (mcastIp != null) {
2062 return mcastNextObjStore.entrySet().stream()
2063 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Piere99511d2018-04-19 16:47:06 +02002064 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01002065 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01002066 return mcastNextObjStore.entrySet().stream()
Piere99511d2018-04-19 16:47:06 +02002067 .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().value().id()));
Pier Luigi0f9635b2018-01-15 18:06:43 +01002068 }
2069
Pier71c55772018-04-17 17:25:22 +02002070 /**
Charles Chan0b1dd7e2018-08-19 19:21:46 -07002071 * Removes given next ID from mcast next id store.
2072 *
2073 * @param nextId next id
2074 */
2075 public void removeNextId(int nextId) {
2076 mcastNextObjStore.entrySet().forEach(e -> {
2077 if (e.getValue().value().id() == nextId) {
2078 mcastNextObjStore.remove(e.getKey());
2079 }
2080 });
2081 }
2082
2083 /**
Piere99511d2018-04-19 16:47:06 +02002084 * Returns the associated roles to the mcast groups.
2085 *
2086 * @param mcastIp the group ip
2087 * @param sourcecp the source connect point
2088 * @return the mapping mcastIp-device to mcast role
2089 */
Charles Chanba59dd62018-05-10 22:19:49 +00002090 public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp,
2091 ConnectPoint sourcecp) {
Piere99511d2018-04-19 16:47:06 +02002092 if (mcastIp != null) {
2093 Map<McastRoleStoreKey, McastRole> roles = mcastRoleStore.entrySet().stream()
2094 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
2095 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
2096 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
2097 if (sourcecp != null) {
2098 roles = roles.entrySet().stream()
2099 .filter(mcastEntry -> sourcecp.equals(mcastEntry.getKey().source()))
2100 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
2101 entry.getKey().deviceId(), entry.getKey().source()), Entry::getValue));
2102 }
2103 return roles;
2104 }
2105 return mcastRoleStore.entrySet().stream()
2106 .collect(Collectors.toMap(entry -> new McastRoleStoreKey(entry.getKey().mcastIp(),
2107 entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
2108 }
2109
Pier71c55772018-04-17 17:25:22 +02002110 /**
2111 * Returns the associated trees to the mcast group.
2112 *
2113 * @param mcastIp the group ip
2114 * @param sourcecp the source connect point
2115 * @return the mapping egress point to mcast path
2116 */
Charles Chanba59dd62018-05-10 22:19:49 +00002117 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
2118 ConnectPoint sourcecp) {
Pier71c55772018-04-17 17:25:22 +02002119 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
Pier71c55772018-04-17 17:25:22 +02002120 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
Pier71c55772018-04-17 17:25:22 +02002121 if (sourcecp != null) {
2122 sources = sources.stream()
Piere99511d2018-04-19 16:47:06 +02002123 .filter(source -> source.equals(sourcecp)).collect(Collectors.toSet());
Pier71c55772018-04-17 17:25:22 +02002124 }
Pier71c55772018-04-17 17:25:22 +02002125 if (!sources.isEmpty()) {
2126 sources.forEach(source -> {
Pier71c55772018-04-17 17:25:22 +02002127 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
2128 Set<DeviceId> visited = Sets.newHashSet();
2129 List<ConnectPoint> currentPath = Lists.newArrayList(source);
Charles Chan0b1dd7e2018-08-19 19:21:46 -07002130 mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
2131 currentPath, mcastIp, source);
Pier71c55772018-04-17 17:25:22 +02002132 mcastPaths.forEach(mcastTrees::put);
2133 });
2134 }
2135 return mcastTrees;
2136 }
2137
2138 /**
Pierdb27b8d2018-04-17 16:29:56 +02002139 * Return the leaders of the mcast groups.
2140 *
2141 * @param mcastIp the group ip
2142 * @return the mapping group-node
2143 */
2144 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
2145 return mcastUtils.getMcastLeaders(mcastIp);
2146 }
Harshada Chaundkar9204f312019-07-02 16:01:24 +00002147
2148 /**
2149 * Returns the mcast filtering obj.
2150 *
2151 * @return the mapping group-node
2152 */
2153 public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
2154 Map<DeviceId, List<McastFilteringObjStoreKey>> mapping = Maps.newHashMap();
2155 Set<McastFilteringObjStoreKey> currentKeys = Sets.newHashSet(mcastFilteringObjStore);
2156 currentKeys.forEach(filteringObjStoreKey ->
2157 mapping.compute(filteringObjStoreKey.ingressCP().deviceId(), (k, v) -> {
2158 List<McastFilteringObjStoreKey> values = v;
2159 if (values == null) {
2160 values = Lists.newArrayList();
2161 }
2162 values.add(filteringObjStoreKey);
2163 return values;
2164 })
2165 );
2166 return mapping;
2167 }
Charles Chanc91c8782016-03-30 17:54:24 -07002168}