blob: 9a53e13e480ffd9597cdd15e4a30ec43417f9961 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Pier Luigid29ca7c2018-02-28 17:24:03 +010019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Pier7b657162018-03-27 11:29:42 -070023import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070024import com.google.common.collect.ImmutableSet;
25import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010026import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070027import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070028import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070029import org.onlab.packet.VlanId;
30import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020031import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070032import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070034import org.onosproject.mcast.api.McastEvent;
35import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070036import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070037import org.onosproject.mcast.api.McastRouteUpdate;
Pier7b657162018-03-27 11:29:42 -070038import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070039import org.onosproject.net.ConnectPoint;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Link;
42import org.onosproject.net.Path;
43import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070044import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070045import org.onosproject.net.flowobjective.ForwardingObjective;
46import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070048import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010049import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070050import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070051import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010052import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chan72779502016-04-23 17:36:10 -070053import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070054import org.onosproject.store.serializers.KryoNamespaces;
55import org.onosproject.store.service.ConsistentMap;
56import org.onosproject.store.service.Serializer;
Pier Luigi580fd8a2018-01-16 10:47:50 +010057import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070058import org.slf4j.Logger;
59import org.slf4j.LoggerFactory;
60
Pier Luigi35dab3f2018-01-25 16:16:02 +010061import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070062import java.util.Collection;
63import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010064import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070065import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070066import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070067import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070068import java.util.Optional;
69import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010070import java.util.concurrent.ScheduledExecutorService;
71import java.util.concurrent.TimeUnit;
72import java.util.concurrent.locks.Lock;
73import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070074import java.util.stream.Collectors;
75
Pier Luigi35dab3f2018-01-25 16:16:02 +010076import static java.util.concurrent.Executors.newScheduledThreadPool;
77import static org.onlab.util.Tools.groupedThreads;
Pier1f87aca2018-03-14 16:47:32 -070078
Pierdb27b8d2018-04-17 16:29:56 +020079import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070080import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pierdb27b8d2018-04-17 16:29:56 +020081import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
82import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
83import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
84import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
85
Pier979e61a2018-03-07 11:42:50 +010086import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
87import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
88import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070089
90/**
Pier Luigi69f774d2018-02-28 12:10:50 +010091 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070092 */
Charles Chan1eaf4802016-04-18 13:44:03 -070093public class McastHandler {
Pier7b657162018-03-27 11:29:42 -070094 // Logger instance
Charles Chan1eaf4802016-04-18 13:44:03 -070095 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Pier7b657162018-03-27 11:29:42 -070096 // Reference to srManager and most used internal objects
Charles Chanc91c8782016-03-30 17:54:24 -070097 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070098 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +020099 private final McastUtils mcastUtils;
Pier7b657162018-03-27 11:29:42 -0700100 // Internal store of the Mcast nextobjectives
Charles Chan72779502016-04-23 17:36:10 -0700101 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Pier7b657162018-03-27 11:29:42 -0700102 // Internal store of the Mcast roles
Charles Chan72779502016-04-23 17:36:10 -0700103 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
104
Pier Luigid29ca7c2018-02-28 17:24:03 +0100105 // Wait time for the cache
106 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700107
Pier Luigid29ca7c2018-02-28 17:24:03 +0100108 /**
109 * The mcastEventCache is implemented to avoid race condition by giving more time to the
110 * underlying subsystems to process previous calls.
111 */
112 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
113 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
114 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
115 // Get group ip, sink and related event
116 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700117 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100118 McastEvent mcastEvent = notification.getValue();
119 RemovalCause cause = notification.getCause();
120 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
121 mcastIp, sink, mcastEvent, cause);
122 // If it expires or it has been replaced, we deque the event
123 switch (notification.getCause()) {
124 case REPLACED:
125 case EXPIRED:
126 dequeueMcastEvent(mcastEvent);
127 break;
128 default:
129 break;
130 }
131 }).build();
132
133 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700134 // Retrieve, currentData, prevData and the group
Pier1f87aca2018-03-14 16:47:32 -0700135 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700136 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
137 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100138 // Let's create the keys of the cache
Pier7b657162018-03-27 11:29:42 -0700139 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700140 if (mcastEvent.type() == SOURCES_ADDED ||
141 mcastEvent.type() == SOURCES_REMOVED) {
142 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700143 sinksBuilder.addAll(Collections.emptySet());
144 } else if (mcastEvent.type() == SINKS_ADDED) {
145 // We need to process the host id one by one
146 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
147 // Get the previous locations and verify if there are changes
148 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
149 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
150 prevConnectPoints : Collections.emptySet());
151 if (!changes.isEmpty()) {
152 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700153 }
Pier7b657162018-03-27 11:29:42 -0700154 }));
155 } else if (mcastEvent.type() == SINKS_REMOVED) {
156 // We need to process the host id one by one
157 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
158 // Get the current locations and verify if there are changes
159 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
160 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
161 currentConnectPoints : Collections.emptySet());
162 if (!changes.isEmpty()) {
163 sinksBuilder.add(hostId);
164 }
165 }));
166 } else if (mcastEvent.type() == ROUTE_REMOVED) {
167 // Current subject is null, just take the previous host ids
168 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100169 }
170 // Push the elements in the cache
171 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700172 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100173 mcastEventCache.put(cacheKey, mcastEvent);
174 });
175 }
176
177 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700178 // Get new and old data
179 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
180 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100181 // Get source, mcast group
Pier1f87aca2018-03-14 16:47:32 -0700182 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700183 final ConnectPoint source = mcastPrevUpdate.sources()
Pier1f87aca2018-03-14 16:47:32 -0700184 .stream()
185 .findFirst()
186 .orElse(null);
Pier7b657162018-03-27 11:29:42 -0700187 IpAddress mcastIp = mcastPrevUpdate.route().group();
188 // Get all the previous sinks
189 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Pier1f87aca2018-03-14 16:47:32 -0700190 .values()
191 .stream()
192 .flatMap(Collection::stream)
193 .collect(Collectors.toSet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100194 // According to the event type let's call the proper method
195 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700196 case SOURCES_ADDED:
197 // FIXME To be addressed with multiple sources support
198 // Get all the sinks
199 //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
200 // Compute the Mcast tree
201 //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
202 // Process the given sinks using the pre-computed paths
203 //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100204 break;
Pier1f87aca2018-03-14 16:47:32 -0700205 case SOURCES_REMOVED:
206 // FIXME To be addressed with multiple sources support
Pier Luigid29ca7c2018-02-28 17:24:03 +0100207 // Get old source
Pier1f87aca2018-03-14 16:47:32 -0700208 //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100209 // Just the first cached element will be processed
Pier1f87aca2018-03-14 16:47:32 -0700210 //processSourceUpdatedInternal(mcastIp, source, oldSource);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100211 break;
212 case ROUTE_REMOVED:
213 // Process the route removed, just the first cached element will be processed
Pier7b657162018-03-27 11:29:42 -0700214 processRouteRemovedInternal(source, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100215 break;
Pier1f87aca2018-03-14 16:47:32 -0700216 case SINKS_ADDED:
Pier7b657162018-03-27 11:29:42 -0700217 // FIXME To be addressed with multiple sources support
218 processSinksAddedInternal(source, mcastIp,
219 mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100220 break;
Pier1f87aca2018-03-14 16:47:32 -0700221 case SINKS_REMOVED:
Pier7b657162018-03-27 11:29:42 -0700222 // FIXME To be addressed with multiple sources support
223 processSinksRemovedInternal(source, mcastIp,
Pier28164682018-04-17 15:50:43 +0200224 mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100225 break;
226 default:
227 break;
228 }
229 }
230
Pier Luigi35dab3f2018-01-25 16:16:02 +0100231 // Mcast lock to serialize local operations
232 private final Lock mcastLock = new ReentrantLock();
233
234 /**
235 * Acquires the lock used when making mcast changes.
236 */
237 private void mcastLock() {
238 mcastLock.lock();
239 }
240
241 /**
242 * Releases the lock used when making mcast changes.
243 */
244 private void mcastUnlock() {
245 mcastLock.unlock();
246 }
247
248 // Stability threshold for Mcast. Seconds
249 private static final long MCAST_STABLITY_THRESHOLD = 5;
250 // Last change done
251 private Instant lastMcastChange = Instant.now();
252
253 /**
254 * Determines if mcast in the network has been stable in the last
255 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
256 * to the last mcast change timestamp.
257 *
258 * @return true if stable
259 */
260 private boolean isMcastStable() {
261 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
262 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800263 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100264 return (now - last) > MCAST_STABLITY_THRESHOLD;
265 }
266
267 // Verify interval for Mcast
268 private static final long MCAST_VERIFY_INTERVAL = 30;
269
270 // Executor for mcast bucket corrector
271 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100272 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100273
Charles Chan72779502016-04-23 17:36:10 -0700274 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700275 * Constructs the McastEventHandler.
276 *
277 * @param srManager Segment Routing manager
278 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700279 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700280 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700281 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700282 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700283 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700284 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700285 .register(McastStoreKey.class)
286 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700287 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700288 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700289 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700290 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700291 .build();
Pier7b657162018-03-27 11:29:42 -0700292 mcastRoleStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700293 .<McastStoreKey, McastRole>consistentMapBuilder()
294 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700295 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700296 .build();
Pier7b657162018-03-27 11:29:42 -0700297 // Let's create McastUtils object
298 mcastUtils = new McastUtils(srManager, coreAppId, log);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100299 // Init the executor service and the buckets corrector
300 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700301 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100302 // Schedule the clean up, this will allow the processing of the expired events
303 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
304 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700305 }
306
307 /**
308 * Read initial multicast from mcast store.
309 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100310 public void init() {
Pier7b657162018-03-27 11:29:42 -0700311 lastMcastChange = Instant.now();
312 mcastLock();
313 try {
314 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +0200315 // Verify leadership on the operation
316 if (!mcastUtils.isLeader(mcastRoute.group())) {
317 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
318 return;
319 }
Pier7b657162018-03-27 11:29:42 -0700320 // FIXME To be addressed with multiple sources support
321 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
322 .stream()
323 .findFirst()
324 .orElse(null);
325 // Get all the sinks and process them
326 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
327 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
328 // Filter out all the working sinks, we do not want to move them
329 sinks = sinks.stream()
330 .filter(sink -> {
331 McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
332 Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
333 return verMcastNext == null ||
334 !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
335 })
336 .collect(Collectors.toSet());
337 // Compute the Mcast tree
338 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
339 // Process the given sinks using the pre-computed paths
340 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
341 mcastRoute.group(), paths));
342 });
343 } finally {
344 mcastUnlock();
345 }
Charles Chanc91c8782016-03-30 17:54:24 -0700346 }
347
348 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100349 * Clean up when deactivating the application.
350 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100351 public void terminate() {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100352 executorService.shutdown();
353 }
354
355 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100356 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
357 * SINK_REMOVED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700358 *
359 * @param event McastEvent with SOURCE_ADDED type
360 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100361 public void processMcastEvent(McastEvent event) {
362 log.info("process {}", event);
Pierdb27b8d2018-04-17 16:29:56 +0200363 // If it is a route added, we do not enqueue
364 if (event.type() == ROUTE_ADDED) {
365 // We need just to elect a leader
366 processRouteAddedInternal(event.subject().route().group());
367 } else {
368 // Just enqueue for now
369 enqueueMcastEvent(event);
370 }
Pier Luigi6786b922018-02-02 16:19:11 +0100371 }
372
Pierdb27b8d2018-04-17 16:29:56 +0200373
Pier Luigi6786b922018-02-02 16:19:11 +0100374 /**
Pierdb27b8d2018-04-17 16:29:56 +0200375 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100376 *
Pierdb27b8d2018-04-17 16:29:56 +0200377 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100378 */
Pierdb27b8d2018-04-17 16:29:56 +0200379 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100380 lastMcastChange = Instant.now();
381 mcastLock();
382 try {
Pierdb27b8d2018-04-17 16:29:56 +0200383 log.debug("Processing route added for group {}", mcastIp);
384 // Just elect a new leader
385 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100386 } finally {
387 mcastUnlock();
388 }
389 }
390
391 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100392 * Removes the entire mcast tree related to this group.
393 *
394 * @param mcastIp multicast group IP address
395 */
396 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
397 lastMcastChange = Instant.now();
398 mcastLock();
399 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100400 log.debug("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200401 // Verify leadership on the operation
402 if (!mcastUtils.isLeader(mcastIp)) {
403 log.debug("Skip {} due to lack of leadership", mcastIp);
404 mcastUtils.withdrawLeader(mcastIp);
405 return;
406 }
Pier Luigi6786b922018-02-02 16:19:11 +0100407
408 // Find out the ingress, transit and egress device of the affected group
Pier979e61a2018-03-07 11:42:50 +0100409 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi6786b922018-02-02 16:19:11 +0100410 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700411 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100412 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi6786b922018-02-02 16:19:11 +0100413
Pier1a7e0c02018-03-12 15:00:54 -0700414 // If there are no egress devices, sinks could be only on the ingress
Pier Luigi6786b922018-02-02 16:19:11 +0100415 if (!egressDevices.isEmpty()) {
416 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700417 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier Luigi6786b922018-02-02 16:19:11 +0100418 );
419 }
Pier1a7e0c02018-03-12 15:00:54 -0700420 // Transit could be empty if sinks are on the ingress
421 if (!transitDevices.isEmpty()) {
422 transitDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700423 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier1a7e0c02018-03-12 15:00:54 -0700424 );
Pier Luigi6786b922018-02-02 16:19:11 +0100425 }
426 // Ingress device should be not null
427 if (ingressDevice != null) {
Pier7b657162018-03-27 11:29:42 -0700428 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi6786b922018-02-02 16:19:11 +0100429 }
Pier Luigi6786b922018-02-02 16:19:11 +0100430 } finally {
431 mcastUnlock();
432 }
433 }
434
Pier7b657162018-03-27 11:29:42 -0700435
436 /**
437 * Process sinks to be removed.
438 *
439 * @param source the source connect point
440 * @param mcastIp the ip address of the group
441 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200442 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700443 */
444 private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
445 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200446 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700447 lastMcastChange = Instant.now();
448 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700449 try {
Pierdb27b8d2018-04-17 16:29:56 +0200450 // Verify leadership on the operation
451 if (!mcastUtils.isLeader(mcastIp)) {
452 log.debug("Skip {} due to lack of leadership", mcastIp);
453 return;
454 }
Pier28164682018-04-17 15:50:43 +0200455 // Remove the previous ones
456 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
457 newSinks);
458 sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
Pier7b657162018-03-27 11:29:42 -0700459 // Recover the dual-homed sinks
Pier28164682018-04-17 15:50:43 +0200460 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
461 prevSinks);
Pier7b657162018-03-27 11:29:42 -0700462 sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
Pier7b657162018-03-27 11:29:42 -0700463 } finally {
464 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700465 }
466 }
467
Pier Luigi6786b922018-02-02 16:19:11 +0100468 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100469 * Removes a path from source to sink for given multicast group.
470 *
471 * @param source connect point of the multicast source
472 * @param sink connection point of the multicast sink
473 * @param mcastIp multicast group IP address
474 */
475 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700476 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100477 lastMcastChange = Instant.now();
478 mcastLock();
479 try {
Pier7b657162018-03-27 11:29:42 -0700480 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100481 // When source and sink are on the same device
482 if (source.deviceId().equals(sink.deviceId())) {
483 // Source and sink are on even the same port. There must be something wrong.
484 if (source.port().equals(sink.port())) {
485 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
486 mcastIp, sink, source);
487 return;
488 }
Pier7b657162018-03-27 11:29:42 -0700489 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100490 if (isLast) {
491 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
492 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100493 return;
494 }
Charles Chanc91c8782016-03-30 17:54:24 -0700495
Pier Luigi35dab3f2018-01-25 16:16:02 +0100496 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700497 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100498 if (isLast) {
499 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
500 }
501
502 // If this is the last sink on the device, also update upstream
Pier1f87aca2018-03-14 16:47:32 -0700503 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
504 mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100505 if (mcastPath.isPresent()) {
506 List<Link> links = Lists.newArrayList(mcastPath.get().links());
507 Collections.reverse(links);
508 for (Link link : links) {
509 if (isLast) {
510 isLast = removePortFromDevice(
511 link.src().deviceId(),
512 link.src().port(),
513 mcastIp,
Pier7b657162018-03-27 11:29:42 -0700514 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
515 source : null)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100516 );
Pier Luigi92e69be2018-03-02 12:53:37 +0100517 if (isLast) {
518 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
519 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100520 }
Charles Chanc91c8782016-03-30 17:54:24 -0700521 }
522 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100523 } finally {
524 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700525 }
526 }
527
Pier7b657162018-03-27 11:29:42 -0700528
529 /**
530 * Process sinks to be added.
531 *
532 * @param source the source connect point
533 * @param mcastIp the group IP
534 * @param newSinks the new sinks to be processed
535 * @param allPrevSinks all previous sinks
536 */
537 private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
538 Map<HostId, Set<ConnectPoint>> newSinks,
539 Set<ConnectPoint> allPrevSinks) {
540 lastMcastChange = Instant.now();
541 mcastLock();
542 try {
Pierdb27b8d2018-04-17 16:29:56 +0200543 // Verify leadership on the operation
544 if (!mcastUtils.isLeader(mcastIp)) {
545 log.debug("Skip {} due to lack of leadership", mcastIp);
546 return;
547 }
Pier7b657162018-03-27 11:29:42 -0700548 // Get the only sinks to be processed (new ones)
549 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
550 // Install new sinks
551 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
552 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
553 } finally {
554 mcastUnlock();
555 }
556 }
557
Charles Chanc91c8782016-03-30 17:54:24 -0700558 /**
559 * Establishes a path from source to sink for given multicast group.
560 *
561 * @param source connect point of the multicast source
562 * @param sink connection point of the multicast sink
563 * @param mcastIp multicast group IP address
564 */
565 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700566 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100567 lastMcastChange = Instant.now();
568 mcastLock();
569 try {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100570 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700571 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
572 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Charles Chan72779502016-04-23 17:36:10 -0700573
Pier Luigi35dab3f2018-01-25 16:16:02 +0100574 // When source and sink are on the same device
575 if (source.deviceId().equals(sink.deviceId())) {
576 // Source and sink are on even the same port. There must be something wrong.
577 if (source.port().equals(sink.port())) {
578 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
579 mcastIp, sink, source);
580 return;
581 }
Pier7b657162018-03-27 11:29:42 -0700582 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier979e61a2018-03-07 11:42:50 +0100583 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100584 return;
585 }
Charles Chan72779502016-04-23 17:36:10 -0700586
Pier Luigi35dab3f2018-01-25 16:16:02 +0100587 // Find a path. If present, create/update groups and flows for each hop
Pier1f87aca2018-03-14 16:47:32 -0700588 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
589 mcastIp, allPaths);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100590 if (mcastPath.isPresent()) {
591 List<Link> links = mcastPath.get().links();
Charles Chan72779502016-04-23 17:36:10 -0700592
Pier1a7e0c02018-03-12 15:00:54 -0700593 // Setup mcast role for ingress
594 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
595 INGRESS);
596
597 // Setup properly the transit
Pier Luigi35dab3f2018-01-25 16:16:02 +0100598 links.forEach(link -> {
599 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700600 mcastUtils.assignedVlan(link.src().deviceId()
601 .equals(source.deviceId()) ? source : null));
602 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
603 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100604 });
605
Pier1a7e0c02018-03-12 15:00:54 -0700606 // Setup mcast role for the transit
607 links.stream()
608 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
609 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.dst().deviceId()),
610 TRANSIT));
611
Pier Luigi35dab3f2018-01-25 16:16:02 +0100612 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700613 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700614 // Setup mcast role for egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100615 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100616 EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100617 } else {
618 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
619 source.deviceId(), sink.deviceId());
620 }
621 } finally {
622 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700623 }
624 }
625
626 /**
Charles Chan72779502016-04-23 17:36:10 -0700627 * Processes the LINK_DOWN event.
628 *
629 * @param affectedLink Link that is going down
630 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100631 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100632 lastMcastChange = Instant.now();
633 mcastLock();
634 try {
635 // Get groups affected by the link down event
636 getAffectedGroups(affectedLink).forEach(mcastIp -> {
637 // TODO Optimize when the group editing is in place
638 log.debug("Processing link down {} for group {}",
639 affectedLink, mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200640 // Verify leadership on the operation
641 if (!mcastUtils.isLeader(mcastIp)) {
642 log.debug("Skip {} due to lack of leadership", mcastIp);
643 return;
644 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100645
Pier Luigi35dab3f2018-01-25 16:16:02 +0100646 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100647 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100648 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700649 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100650 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700651 ConnectPoint source = mcastUtils.getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700652
Pier1a7e0c02018-03-12 15:00:54 -0700653 // Do not proceed if ingress device or source of this group are missing
654 // If sinks are in other leafs, we have ingress, transit, egress, and source
655 // If sinks are in the same leaf, we have just ingress and source
656 if (ingressDevice == null || source == null) {
657 log.warn("Missing ingress {} or source {} for group {}",
658 ingressDevice, source, mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100659 return;
Charles Chan72779502016-04-23 17:36:10 -0700660 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100661
Pier Luigi35dab3f2018-01-25 16:16:02 +0100662 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700663 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700664 removeGroupFromDevice(transitDevice, mcastIp,
665 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100666
Pier1a7e0c02018-03-12 15:00:54 -0700667 // Remove transit-facing ports on the ingress device
668 removeIngressTransitPorts(mcastIp, ingressDevice, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100669
Pier7b657162018-03-27 11:29:42 -0700670 // TODO create a shared procedure with DEVICE_DOWN
Pier1f87aca2018-03-14 16:47:32 -0700671 // Compute mcast tree for the the egress devices
672 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
673
Pier7b657162018-03-27 11:29:42 -0700674 // We have to verify, if there are egresses without paths
675 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700676 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700677 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700678 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
679 mcastIp, paths);
Pier7b657162018-03-27 11:29:42 -0700680 // No paths, we have to try with alternative location
681 if (!mcastPath.isPresent()) {
682 notRecovered.add(egressDevice);
683 // We were not able to find an alternative path for this egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100684 log.warn("Fail to recover egress device {} from link failure {}",
685 egressDevice, affectedLink);
Pier7b657162018-03-27 11:29:42 -0700686 removeGroupFromDevice(egressDevice, mcastIp,
687 mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100688 }
689 });
Pier7b657162018-03-27 11:29:42 -0700690
691 // Fast path, we can recover all the locations
692 if (notRecovered.isEmpty()) {
693 // Construct a new path for each egress device
694 mcastTree.forEach((egressDevice, paths) -> {
695 // We try to enforce the sinks path on the mcast tree
696 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
697 mcastIp, paths);
698 // If a path is present, let's install it
699 if (mcastPath.isPresent()) {
700 installPath(mcastIp, source, mcastPath.get());
701 }
702 });
703 } else {
704 // Let's try to recover using alternate
705 recoverSinks(egressDevices, notRecovered, mcastIp,
706 ingressDevice, source, true);
707 }
Charles Chan72779502016-04-23 17:36:10 -0700708 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100709 } finally {
710 mcastUnlock();
711 }
Charles Chan72779502016-04-23 17:36:10 -0700712 }
713
714 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100715 * Process the DEVICE_DOWN event.
716 *
717 * @param deviceDown device going down
718 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100719 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100720 lastMcastChange = Instant.now();
721 mcastLock();
722 try {
723 // Get the mcast groups affected by the device going down
724 getAffectedGroups(deviceDown).forEach(mcastIp -> {
725 // TODO Optimize when the group editing is in place
726 log.debug("Processing device down {} for group {}",
727 deviceDown, mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200728 // Verify leadership on the operation
729 if (!mcastUtils.isLeader(mcastIp)) {
730 log.debug("Skip {} due to lack of leadership", mcastIp);
731 return;
732 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100733
Pier Luigi35dab3f2018-01-25 16:16:02 +0100734 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100735 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100736 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700737 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100738 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700739 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100740
Pier Luigi35dab3f2018-01-25 16:16:02 +0100741 // Do not proceed if ingress device or source of this group are missing
742 // If sinks are in other leafs, we have ingress, transit, egress, and source
743 // If sinks are in the same leaf, we have just ingress and source
744 if (ingressDevice == null || source == null) {
745 log.warn("Missing ingress {} or source {} for group {}",
746 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100747 return;
748 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100749
Pier Luigi35dab3f2018-01-25 16:16:02 +0100750 // If it exists, we have to remove it in any case
Pier1a7e0c02018-03-12 15:00:54 -0700751 if (!transitDevices.isEmpty()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100752 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700753 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700754 removeGroupFromDevice(transitDevice, mcastIp,
755 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100756 }
757 // If the ingress is down
758 if (ingressDevice.equals(deviceDown)) {
759 // Remove entire ingress
Pier7b657162018-03-27 11:29:42 -0700760 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100761 // If other sinks different from the ingress exist
762 if (!egressDevices.isEmpty()) {
763 // Remove all the remaining egress
764 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700765 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
766 mcastUtils.assignedVlan(null))
Pier Luigi35dab3f2018-01-25 16:16:02 +0100767 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100768 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100769 } else {
770 // Egress or transit could be down at this point
Pier1a7e0c02018-03-12 15:00:54 -0700771 // Get the ingress-transit ports if they exist
772 removeIngressTransitPorts(mcastIp, ingressDevice, source);
773
Pier Luigi35dab3f2018-01-25 16:16:02 +0100774 // One of the egress device is down
775 if (egressDevices.contains(deviceDown)) {
776 // Remove entire device down
Pier7b657162018-03-27 11:29:42 -0700777 removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100778 // Remove the device down from egress
779 egressDevices.remove(deviceDown);
780 // If there are no more egress and ingress does not have sinks
781 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100782 // We have done
783 return;
784 }
785 }
Pier1f87aca2018-03-14 16:47:32 -0700786
787 // Compute mcast tree for the the egress devices
788 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
789
Pier7b657162018-03-27 11:29:42 -0700790 // We have to verify, if there are egresses without paths
791 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700792 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700793 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700794 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
Pier7b657162018-03-27 11:29:42 -0700795 mcastIp, paths);
796 // No paths, we have to try with alternative location
797 if (!mcastPath.isPresent()) {
798 notRecovered.add(egressDevice);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100799 // We were not able to find an alternative path for this egress
800 log.warn("Fail to recover egress device {} from device down {}",
801 egressDevice, deviceDown);
Pier7b657162018-03-27 11:29:42 -0700802 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100803 }
804 });
Pier7b657162018-03-27 11:29:42 -0700805
806 // Fast path, we can recover all the locations
807 if (notRecovered.isEmpty()) {
808 // Construct a new path for each egress device
809 mcastTree.forEach((egressDevice, paths) -> {
810 // We try to enforce the sinks path on the mcast tree
811 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
812 mcastIp, paths);
813 // If a path is present, let's install it
814 if (mcastPath.isPresent()) {
815 installPath(mcastIp, source, mcastPath.get());
816 }
817 });
818 } else {
819 // Let's try to recover using alternate
820 recoverSinks(egressDevices, notRecovered, mcastIp,
821 ingressDevice, source, false);
822 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100823 }
824 });
825 } finally {
826 mcastUnlock();
827 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100828 }
829
830 /**
Pier7b657162018-03-27 11:29:42 -0700831 * Try to recover sinks using alternate locations.
832 *
833 * @param egressDevices the original egress devices
834 * @param notRecovered the devices not recovered
835 * @param mcastIp the group address
836 * @param ingressDevice the ingress device
837 * @param source the source connect point
838 * @param isLinkFailure true if it is a link failure, otherwise false
839 */
840 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
841 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
842 boolean isLinkFailure) {
843 // Recovered devices
844 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
845 // Total affected sinks
846 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
847 // Total sinks
848 Set<ConnectPoint> totalSinks = Sets.newHashSet();
849 // Let's compute all the affected sinks and all the sinks
850 notRecovered.forEach(deviceId -> {
851 totalAffectedSinks.addAll(
852 mcastUtils.getAffectedSinks(deviceId, mcastIp)
853 .values()
854 .stream()
855 .flatMap(Collection::stream)
856 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
857 .collect(Collectors.toSet())
858 );
859 totalSinks.addAll(
860 mcastUtils.getAffectedSinks(deviceId, mcastIp)
861 .values()
862 .stream()
863 .flatMap(Collection::stream)
864 .collect(Collectors.toSet())
865 );
866 });
867
868 // Sinks to be added
869 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
870 // New egress devices, filtering out the source
871 Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
872 .map(ConnectPoint::deviceId)
873 .collect(Collectors.toSet());
874 // Let's add the devices recovered from the previous round
875 newEgressDevice.addAll(recovered);
876 // Let's do a copy of the new egresses and filter out the source
877 Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
878 newEgressDevice = newEgressDevice.stream()
879 .filter(deviceId -> !deviceId.equals(ingressDevice))
880 .collect(Collectors.toSet());
881
882 // Re-compute mcast tree for the the egress devices
883 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
884 // if the source was originally in the new locations, add new sinks
885 if (copyNewEgressDevice.contains(ingressDevice)) {
886 sinksToBeAdded.stream()
887 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
888 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
889 }
890
891 // Construct a new path for each egress device
892 mcastTree.forEach((egressDevice, paths) -> {
893 // We try to enforce the sinks path on the mcast tree
894 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
895 mcastIp, paths);
896 // If a path is present, let's install it
897 if (mcastPath.isPresent()) {
898 // Using recovery procedure
899 if (recovered.contains(egressDevice)) {
900 installPath(mcastIp, source, mcastPath.get());
901 } else {
902 // otherwise we need to threat as new sink
903 sinksToBeAdded.stream()
904 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
905 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
906 }
907 } else {
908 // We were not able to find an alternative path for this egress
909 log.warn("Fail to recover egress device {} from {} failure",
910 egressDevice, isLinkFailure ? "Link" : "Device");
911 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
912 }
913 });
914
915 }
916
917 /**
Pier28164682018-04-17 15:50:43 +0200918 * Process all the sinks related to a mcast group and return
919 * the ones to be removed.
920 *
921 * @param mcastIp the group address
922 * @param prevsinks the previous sinks to be evaluated
923 * @param newSinks the new sinks to be evaluted
924 * @return the set of the sinks to be removed
925 */
926 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
927 Map<HostId, Set<ConnectPoint>> prevsinks,
928 Map<HostId, Set<ConnectPoint>> newSinks) {
929 // Iterate over the sinks in order to build the set
930 // of the connect points to be removed from this group
931 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
932 prevsinks.forEach(((hostId, connectPoints) -> {
933 // We have to check with the existing flows
934 ConnectPoint sinkToBeProcessed = connectPoints.stream()
935 .filter(connectPoint -> isSink(mcastIp, connectPoint))
936 .findFirst().orElse(null);
937 if (sinkToBeProcessed != null) {
938 // If the host has been removed or location has been removed
939 if (!newSinks.containsKey(hostId) ||
940 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
941 sinksToBeProcessed.add(sinkToBeProcessed);
942 }
943 }
944 }));
945 // We have done, return the set
946 return sinksToBeProcessed;
947 }
948
949 /**
Pier7b657162018-03-27 11:29:42 -0700950 * Process new locations and return the set of sinks to be added
951 * in the context of the recovery.
952 *
Pier28164682018-04-17 15:50:43 +0200953 * @param newSinks the remaining sinks
954 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700955 * @return the set of the sinks to be processed
956 */
957 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
Pier28164682018-04-17 15:50:43 +0200958 Map<HostId, Set<ConnectPoint>> newSinks,
959 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700960 // Iterate over the sinks in order to build the set
961 // of the connect points to be served by this group
962 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +0200963 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700964 // If it has more than 1 locations
965 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
966 log.debug("Skip {} since sink {} has {} locations",
967 mcastIp, hostId, connectPoints.size());
968 return;
969 }
Pier28164682018-04-17 15:50:43 +0200970 // If previously it had two locations, we need to recover it
971 // Filter out if the remaining location is already served
972 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
973 sinksToBeProcessed.add(connectPoints.stream()
974 .filter(connectPoint -> !isSink(mcastIp, connectPoint))
975 .findFirst().orElseGet(null));
976 }
Pier7b657162018-03-27 11:29:42 -0700977 });
978 return sinksToBeProcessed;
979 }
980
981 /**
982 * Process all the sinks related to a mcast group and return
983 * the ones to be processed.
984 *
985 * @param source the source connect point
986 * @param mcastIp the group address
987 * @param sinks the sinks to be evaluated
988 * @return the set of the sinks to be processed
989 */
990 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
991 Map<HostId, Set<ConnectPoint>> sinks) {
992 // Iterate over the sinks in order to build the set
993 // of the connect points to be served by this group
994 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
995 sinks.forEach(((hostId, connectPoints) -> {
996 // If it has more than 2 locations
997 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
998 log.debug("Skip {} since sink {} has {} locations",
999 mcastIp, hostId, connectPoints.size());
1000 return;
1001 }
1002 // If it has one location, just use it
1003 if (connectPoints.size() == 1) {
1004 sinksToBeProcessed.add(connectPoints.stream()
1005 .findFirst().orElseGet(null));
1006 return;
1007 }
1008 // We prefer to reuse existing flows
1009 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001010 .filter(connectPoint -> isSink(mcastIp, connectPoint))
Pier7b657162018-03-27 11:29:42 -07001011 .findFirst().orElse(null);
1012 if (sinkToBeProcessed != null) {
1013 sinksToBeProcessed.add(sinkToBeProcessed);
1014 return;
1015 }
1016 // Otherwise we prefer to reuse existing egresses
1017 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
1018 sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001019 .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
Pier7b657162018-03-27 11:29:42 -07001020 .findFirst().orElse(null);
1021 if (sinkToBeProcessed != null) {
1022 sinksToBeProcessed.add(sinkToBeProcessed);
1023 return;
1024 }
1025 // Otherwise we prefer a location co-located with the source (if it exists)
1026 sinkToBeProcessed = connectPoints.stream()
1027 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1028 .findFirst().orElse(null);
1029 if (sinkToBeProcessed != null) {
1030 sinksToBeProcessed.add(sinkToBeProcessed);
1031 return;
1032 }
1033 // Finally, we randomly pick a new location
1034 sinksToBeProcessed.add(connectPoints.stream()
1035 .findFirst().orElseGet(null));
1036 }));
1037 // We have done, return the set
1038 return sinksToBeProcessed;
1039 }
1040
1041 /**
Pier1a7e0c02018-03-12 15:00:54 -07001042 * Utility method to remove all the ingress transit ports.
1043 *
1044 * @param mcastIp the group ip
1045 * @param ingressDevice the ingress device for this group
1046 * @param source the source connect point
1047 */
1048 private void removeIngressTransitPorts(IpAddress mcastIp, DeviceId ingressDevice,
1049 ConnectPoint source) {
1050 Set<PortNumber> ingressTransitPorts = ingressTransitPort(mcastIp);
1051 ingressTransitPorts.forEach(ingressTransitPort -> {
1052 if (ingressTransitPort != null) {
1053 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
Pier7b657162018-03-27 11:29:42 -07001054 mcastIp, mcastUtils.assignedVlan(source));
Pier1a7e0c02018-03-12 15:00:54 -07001055 if (isLast) {
1056 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
1057 }
1058 }
1059 });
1060 }
1061
1062 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001063 * Adds a port to given multicast group on given device. This involves the
1064 * update of L3 multicast group and multicast routing table entry.
1065 *
1066 * @param deviceId device ID
1067 * @param port port to be added
1068 * @param mcastIp multicast group
1069 * @param assignedVlan assigned VLAN ID
1070 */
1071 private void addPortToDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001072 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001073 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001074 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001075 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001076 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001077 // First time someone request this mcast group via this device
1078 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001079 // New nextObj
Pier7b657162018-03-27 11:29:42 -07001080 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001081 portBuilder.build(), null).add();
1082 // Store the new port
1083 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001084 } else {
1085 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001086 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001087 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001088 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001089 if (existingPorts.contains(port)) {
1090 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1091 return;
1092 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001093 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001094 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001095 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001096 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001097 portBuilder.build(), nextObj.id()).addToExisting();
1098 // Store the final next objective and send only the difference to the driver
1099 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1100 // Add just the new port
1101 portBuilder = ImmutableSet.builder();
1102 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001103 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001104 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001105 }
1106 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001107 ObjectiveContext context = new DefaultObjectiveContext(
1108 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1109 mcastIp, deviceId, port.toLong(), assignedVlan),
1110 (objective, error) ->
1111 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1112 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001113 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1114 newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -07001115 srManager.flowObjectiveService.next(deviceId, newNextObj);
1116 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001117 }
1118
1119 /**
1120 * Removes a port from given multicast group on given device.
1121 * This involves the update of L3 multicast group and multicast routing
1122 * table entry.
1123 *
1124 * @param deviceId device ID
1125 * @param port port to be added
1126 * @param mcastIp multicast group
1127 * @param assignedVlan assigned VLAN ID
1128 * @return true if this is the last sink on this device
1129 */
1130 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001131 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001132 McastStoreKey mcastStoreKey =
1133 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001134 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001135 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001136 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1137 return false;
1138 }
Charles Chan72779502016-04-23 17:36:10 -07001139 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001140
Pier7b657162018-03-27 11:29:42 -07001141 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001142 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001143 if (!existingPorts.contains(port)) {
1144 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1145 return false;
1146 }
1147 // Copy and modify the ImmutableSet
1148 existingPorts = Sets.newHashSet(existingPorts);
1149 existingPorts.remove(port);
1150
1151 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001152 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001153 ForwardingObjective fwdObj;
1154 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001155 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -07001156 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1157 // remove L3MG since there might be other flows/groups refer to
1158 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +01001159 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001160 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1161 mcastIp, deviceId, port.toLong(), assignedVlan),
1162 (objective, error) ->
1163 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
1164 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001165 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001166 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001167 } else {
1168 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001169 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001170 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1171 mcastIp, deviceId, port.toLong(), assignedVlan),
1172 (objective, error) ->
1173 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1174 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +01001175 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001176 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001177 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001178 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001179 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001180 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001181 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001182 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001183 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1184 srManager.flowObjectiveService.next(deviceId, newNextObj);
1185 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001186 return existingPorts.isEmpty();
1187 }
1188
Charles Chan72779502016-04-23 17:36:10 -07001189 /**
1190 * Removes entire group on given device.
1191 *
1192 * @param deviceId device ID
1193 * @param mcastIp multicast group to be removed
1194 * @param assignedVlan assigned VLAN ID
1195 */
1196 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -07001197 VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001198 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
1199 // This device is not serving this multicast group
1200 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1201 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1202 return;
1203 }
1204 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1205 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1206 // remove L3MG since there might be other flows/groups refer to
1207 // the same L2IG
1208 ObjectiveContext context = new DefaultObjectiveContext(
1209 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1210 mcastIp, deviceId, assignedVlan),
1211 (objective, error) ->
1212 log.warn("Failed to remove {} on {}, vlan {}: {}",
1213 mcastIp, deviceId, assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001214 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001215 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1216 mcastNextObjStore.remove(mcastStoreKey);
1217 mcastRoleStore.remove(mcastStoreKey);
1218 }
1219
Pier Luigi580fd8a2018-01-16 10:47:50 +01001220 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
1221 // Get Links
1222 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001223
1224 // Setup new ingress mcast role
1225 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).src().deviceId()),
1226 INGRESS);
1227
Pier Luigi580fd8a2018-01-16 10:47:50 +01001228 // For each link, modify the next on the source device adding the src port
1229 // and a new filter objective on the destination port
1230 links.forEach(link -> {
1231 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001232 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1233 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1234 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001235 });
Pier1a7e0c02018-03-12 15:00:54 -07001236
1237 // Setup mcast role for the transit
1238 links.stream()
1239 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
1240 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.src().deviceId()),
1241 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001242 }
1243
Charles Chanc91c8782016-03-30 17:54:24 -07001244 /**
Pier1f87aca2018-03-14 16:47:32 -07001245 * Go through all the paths, looking for shared links to be used
1246 * in the final path computation.
1247 *
1248 * @param egresses egress devices
1249 * @param availablePaths all the available paths towards the egress
1250 * @return shared links between egress devices
1251 */
1252 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1253 Map<DeviceId, List<Path>> availablePaths) {
1254 // Length of the shortest path
1255 int minLength = Integer.MAX_VALUE;
1256 int length;
1257 // Current paths
1258 List<Path> currentPaths;
1259 // Verify the source can still reach all the egresses
1260 for (DeviceId egress : egresses) {
1261 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001262 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001263 currentPaths = availablePaths.get(egress);
1264 if (currentPaths.isEmpty()) {
1265 continue;
1266 }
1267 // Get the length of the first one available,
Pier7b657162018-03-27 11:29:42 -07001268 // update the min length
Pier1f87aca2018-03-14 16:47:32 -07001269 length = currentPaths.get(0).links().size();
1270 if (length < minLength) {
1271 minLength = length;
1272 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001273 }
Pier1f87aca2018-03-14 16:47:32 -07001274 // If there are no paths
1275 if (minLength == Integer.MAX_VALUE) {
1276 return Collections.emptySet();
1277 }
1278 // Iterate looking for shared links
1279 int index = 0;
1280 // Define the sets for the intersection
1281 Set<Link> sharedLinks = Sets.newHashSet();
1282 Set<Link> currentSharedLinks;
1283 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001284 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001285 // Let's find out the shared links
1286 while (index < minLength) {
1287 // Initialize the intersection with the paths related to the first egress
1288 currentPaths = availablePaths.get(
1289 egresses.stream()
1290 .findFirst()
1291 .orElse(null)
1292 );
1293 currentSharedLinks = Sets.newHashSet();
1294 // Iterate over the paths and take the "index" links
1295 for (Path path : currentPaths) {
1296 currentSharedLinks.add(path.links().get(index));
1297 }
1298 // Iterate over the remaining egress
1299 for (DeviceId egress : egresses) {
1300 // Iterate over the paths and take the "index" links
1301 currentLinks = Sets.newHashSet();
1302 for (Path path : availablePaths.get(egress)) {
1303 currentLinks.add(path.links().get(index));
1304 }
1305 // Do intersection
1306 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1307 // If there are no shared paths exit and record the device to remove
1308 // we have to retry with a subset of sinks
1309 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001310 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001311 index = minLength;
1312 break;
1313 }
1314 }
1315 sharedLinks.addAll(currentSharedLinks);
1316 index++;
1317 }
1318 // If the shared links is empty and there are egress
1319 // let's retry another time with less sinks, we can
1320 // still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001321 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1322 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001323 sharedLinks = exploreMcastTree(egresses, availablePaths);
1324 }
1325 return sharedLinks;
1326 }
1327
1328 /**
1329 * Build Mcast tree having as root the given source and as leaves the given egress points.
1330 *
1331 * @param source source of the tree
1332 * @param sinks leaves of the tree
1333 * @return the computed Mcast tree
1334 */
1335 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
Pier7b657162018-03-27 11:29:42 -07001336 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001337 // Get the egress devices, remove source from the egress if present
1338 Set<DeviceId> egresses = sinks.stream()
1339 .map(ConnectPoint::deviceId)
1340 .filter(deviceId -> !deviceId.equals(source))
1341 .collect(Collectors.toSet());
1342 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier7b657162018-03-27 11:29:42 -07001343 // Build final tree and return it as it is
Pier1f87aca2018-03-14 16:47:32 -07001344 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001345 // We need to put back the source if it was originally present
1346 sinks.forEach(sink -> {
1347 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1348 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1349 });
Pier1f87aca2018-03-14 16:47:32 -07001350 return finalTree;
1351 }
1352
1353 /**
1354 * Build Mcast tree having as root the given source and as leaves the given egress.
1355 *
1356 * @param source source of the tree
1357 * @param egresses leaves of the tree
1358 * @return the computed Mcast tree
1359 */
1360 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1361 Set<DeviceId> egresses) {
1362 // Pre-compute all the paths
1363 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
1364 // No links to enforce
1365 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1366 Collections.emptySet())));
1367 // Explore the topology looking for shared links amongst the egresses
1368 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
1369 // Remove all the paths from the previous computation
1370 availablePaths.clear();
1371 // Build the final paths enforcing the shared links between egress devices
1372 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1373 linksToEnforce)));
1374 return availablePaths;
1375 }
1376
1377 /**
1378 * Gets path from src to dst computed using the custom link weigher.
1379 *
1380 * @param src source device ID
1381 * @param dst destination device ID
1382 * @return list of paths from src to dst
1383 */
1384 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
1385 // Takes a snapshot of the topology
1386 final Topology currentTopology = topologyService.currentTopology();
1387 // Build a specific link weigher for this path computation
1388 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
1389 // We will use our custom link weigher for our path
1390 // computations and build the list of valid paths
1391 List<Path> allPaths = Lists.newArrayList(
1392 topologyService.getPaths(currentTopology, src, dst, linkWeigher)
1393 );
1394 // If there are no valid paths, just exit
1395 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1396 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001397 }
1398
Charles Chanc91c8782016-03-30 17:54:24 -07001399 /**
1400 * Gets a path from src to dst.
1401 * If a path was allocated before, returns the allocated path.
1402 * Otherwise, randomly pick one from available paths.
1403 *
1404 * @param src source device ID
1405 * @param dst destination device ID
1406 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001407 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001408 * @return an optional path from src to dst
1409 */
Pier1f87aca2018-03-14 16:47:32 -07001410 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1411 IpAddress mcastIp, List<Path> allPaths) {
1412 // Firstly we get all the valid paths, if the supplied are null
1413 if (allPaths == null) {
1414 allPaths = getPaths(src, dst, Collections.emptySet());
1415 }
1416
1417 // If there are no paths just exit
Charles Chanc91c8782016-03-30 17:54:24 -07001418 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001419 return Optional.empty();
1420 }
1421
Pier Luigi91573e12018-01-23 16:06:38 +01001422 // Create a map index of suitablity-to-list of paths. For example
1423 // a path in the list associated to the index 1 shares only the
1424 // first hop and it is less suitable of a path belonging to the index
1425 // 2 that shares leaf-spine.
1426 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1427 // Some init steps
1428 int nhop;
1429 McastStoreKey mcastStoreKey;
1430 Link hop;
1431 PortNumber srcPort;
1432 Set<PortNumber> existingPorts;
1433 NextObjective nextObj;
1434 // Iterate over paths looking for eligible paths
1435 for (Path path : allPaths) {
1436 // Unlikely, it will happen...
1437 if (!src.equals(path.links().get(0).src().deviceId())) {
1438 continue;
1439 }
1440 nhop = 0;
1441 // Iterate over the links
1442 while (nhop < path.links().size()) {
1443 // Get the link and verify if a next related
1444 // to the src device exist in the store
1445 hop = path.links().get(nhop);
1446 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1447 // It does not exist in the store, exit
1448 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1449 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001450 }
Pier Luigi91573e12018-01-23 16:06:38 +01001451 // Get the output ports on the next
1452 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001453 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001454 // And the src port on the link
1455 srcPort = hop.src().port();
1456 // the src port is not used as output, exit
1457 if (!existingPorts.contains(srcPort)) {
1458 break;
1459 }
1460 nhop++;
1461 }
1462 // n_hop defines the index
1463 if (nhop > 0) {
1464 eligiblePaths.compute(nhop, (index, paths) -> {
1465 paths = paths == null ? Lists.newArrayList() : paths;
1466 paths.add(path);
1467 return paths;
1468 });
Charles Chanc91c8782016-03-30 17:54:24 -07001469 }
1470 }
Pier Luigi91573e12018-01-23 16:06:38 +01001471
1472 // No suitable paths
1473 if (eligiblePaths.isEmpty()) {
1474 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1475 // Otherwise, randomly pick a path
1476 Collections.shuffle(allPaths);
1477 return allPaths.stream().findFirst();
1478 }
1479
1480 // Let's take the best ones
1481 Integer bestIndex = eligiblePaths.keySet()
1482 .stream()
1483 .sorted(Comparator.reverseOrder())
1484 .findFirst().orElse(null);
1485 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1486 log.debug("{} eligiblePath(s) found from {} to {}",
1487 bestPaths.size(), src, dst);
1488 // randomly pick a path on the highest index
1489 Collections.shuffle(bestPaths);
1490 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001491 }
1492
1493 /**
Charles Chan72779502016-04-23 17:36:10 -07001494 * Gets device(s) of given role in given multicast group.
1495 *
1496 * @param mcastIp multicast IP
1497 * @param role multicast role
1498 * @return set of device ID or empty set if not found
1499 */
1500 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1501 return mcastRoleStore.entrySet().stream()
1502 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1503 entry.getValue().value() == role)
Pier1f87aca2018-03-14 16:47:32 -07001504 .map(Entry::getKey).map(McastStoreKey::deviceId)
Charles Chan72779502016-04-23 17:36:10 -07001505 .collect(Collectors.toSet());
1506 }
1507
1508 /**
1509 * Gets groups which is affected by the link down event.
1510 *
1511 * @param link link going down
1512 * @return a set of multicast IpAddress
1513 */
1514 private Set<IpAddress> getAffectedGroups(Link link) {
1515 DeviceId deviceId = link.src().deviceId();
1516 PortNumber port = link.src().port();
1517 return mcastNextObjStore.entrySet().stream()
1518 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Pier7b657162018-03-27 11:29:42 -07001519 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
Pier1f87aca2018-03-14 16:47:32 -07001520 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Charles Chan72779502016-04-23 17:36:10 -07001521 .collect(Collectors.toSet());
1522 }
1523
1524 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001525 * Gets groups which are affected by the device down event.
1526 *
1527 * @param deviceId device going down
1528 * @return a set of multicast IpAddress
1529 */
1530 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1531 return mcastNextObjStore.entrySet().stream()
1532 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001533 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001534 .collect(Collectors.toSet());
1535 }
1536
1537 /**
Charles Chan72779502016-04-23 17:36:10 -07001538 * Gets the spine-facing port on ingress device of given multicast group.
1539 *
1540 * @param mcastIp multicast IP
1541 * @return spine-facing port on ingress device
1542 */
Pier1a7e0c02018-03-12 15:00:54 -07001543 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp) {
Pier979e61a2018-03-07 11:42:50 +01001544 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Charles Chan72779502016-04-23 17:36:10 -07001545 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001546 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001547 if (ingressDevice != null) {
1548 NextObjective nextObj = mcastNextObjStore
1549 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
Pier7b657162018-03-27 11:29:42 -07001550 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001551 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001552 for (PortNumber port : ports) {
1553 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001554 if (srManager.deviceConfiguration() != null &&
1555 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001556 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001557 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001558 }
1559 }
1560 }
Pier1a7e0c02018-03-12 15:00:54 -07001561 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001562 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001563
1564 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001565 * Verify if the given device has sinks
1566 * for the multicast group.
1567 *
1568 * @param deviceId device Id
1569 * @param mcastIp multicast IP
1570 * @return true if the device has sink for the group.
1571 * False otherwise.
1572 */
1573 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1574 if (deviceId != null) {
1575 // Get the nextobjective
1576 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1577 new McastStoreKey(mcastIp, deviceId)
1578 );
1579 // If it exists
1580 if (versionedNextObj != null) {
1581 NextObjective nextObj = versionedNextObj.value();
1582 // Retrieves all the output ports
Pier7b657162018-03-27 11:29:42 -07001583 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier Luigi580fd8a2018-01-16 10:47:50 +01001584 // Tries to find at least one port that is not spine-facing
1585 for (PortNumber port : ports) {
1586 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001587 if (srManager.deviceConfiguration() != null &&
1588 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi580fd8a2018-01-16 10:47:50 +01001589 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1590 return true;
1591 }
1592 }
1593 }
1594 }
1595 return false;
1596 }
1597
1598 /**
Pier28164682018-04-17 15:50:43 +02001599 * Verify if a given connect point is sink for this group.
1600 *
1601 * @param mcastIp group address
1602 * @param connectPoint connect point to be verified
1603 * @return true if the connect point is sink of the group
1604 */
1605 private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
1606 // Let's check if we are already serving that location
1607 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
1608 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1609 return false;
1610 }
1611 // Get next and check with the port
1612 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1613 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1614 }
1615
1616 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001617 * Updates filtering objective for given device and port.
1618 * It is called in general when the mcast config has been
1619 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001620 *
1621 * @param deviceId device ID
1622 * @param portNum ingress port number
1623 * @param vlanId assigned VLAN ID
1624 * @param install true to add, false to remove
1625 */
Pier Luigi69f774d2018-02-28 12:10:50 +01001626 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001627 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001628 lastMcastChange = Instant.now();
1629 mcastLock();
1630 try {
1631 // Iterates over the route and updates properly the filtering objective
1632 // on the source device.
1633 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001634 log.debug("Update filter for {}", mcastRoute.group());
1635 // Verify leadership on the operation
1636 if (!mcastUtils.isLeader(mcastRoute.group())) {
1637 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1638 return;
1639 }
Pier1f87aca2018-03-14 16:47:32 -07001640 // FIXME To be addressed with multiple sources support
1641 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
1642 .stream()
1643 .findFirst().orElse(null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001644 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1645 if (install) {
Pier7b657162018-03-27 11:29:42 -07001646 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001647 } else {
Pier7b657162018-03-27 11:29:42 -07001648 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001649 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001650 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001651 });
1652 } finally {
1653 mcastUnlock();
1654 }
1655 }
1656
1657 /**
1658 * Performs bucket verification operation for all mcast groups in the devices.
1659 * Firstly, it verifies that mcast is stable before trying verification operation.
1660 * Verification consists in creating new nexts with VERIFY operation. Actually,
1661 * the operation is totally delegated to the driver.
1662 */
1663 private final class McastBucketCorrector implements Runnable {
1664
1665 @Override
1666 public void run() {
1667 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1668 if (!isMcastStable()) {
1669 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001670 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001671 // Acquires lock
1672 mcastLock();
1673 try {
1674 // Iterates over the routes and verify the related next objectives
1675 srManager.multicastRouteService.getRoutes()
1676 .stream()
1677 .map(McastRoute::group)
1678 .forEach(mcastIp -> {
1679 log.trace("Running mcast buckets corrector for mcast group: {}",
1680 mcastIp);
1681
1682 // For each group we get current information in the store
1683 // and issue a check of the next objectives in place
Pier979e61a2018-03-07 11:42:50 +01001684 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001685 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001686 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +01001687 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi92e69be2018-03-02 12:53:37 +01001688 // Get source and sinks from Mcast Route Service and warn about errors
Pier7b657162018-03-27 11:29:42 -07001689 ConnectPoint source = mcastUtils.getSource(mcastIp);
1690 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1691 .flatMap(Collection::stream)
1692 .collect(Collectors.toSet());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001693
1694 // Do not proceed if ingress device or source of this group are missing
1695 if (ingressDevice == null || source == null) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001696 if (!sinks.isEmpty()) {
1697 log.warn("Unable to run buckets corrector. " +
1698 "Missing ingress {} or source {} for group {}",
1699 ingressDevice, source, mcastIp);
1700 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001701 return;
1702 }
1703
Pierdb27b8d2018-04-17 16:29:56 +02001704 // Continue only when this instance is the leader of the group
1705 if (!mcastUtils.isLeader(mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001706 log.trace("Unable to run buckets corrector. " +
Pierdb27b8d2018-04-17 16:29:56 +02001707 "Skip {} due to lack of leadership", mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001708 return;
1709 }
1710
1711 // Create the set of the devices to be processed
1712 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1713 devicesBuilder.add(ingressDevice);
Pier1a7e0c02018-03-12 15:00:54 -07001714 if (!transitDevices.isEmpty()) {
1715 devicesBuilder.addAll(transitDevices);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001716 }
1717 if (!egressDevices.isEmpty()) {
1718 devicesBuilder.addAll(egressDevices);
1719 }
1720 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1721
1722 // Iterate over the devices
1723 devicesToProcess.forEach(deviceId -> {
1724 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1725 // If next exists in our store verify related next objective
1726 if (mcastNextObjStore.containsKey(currentKey)) {
1727 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1728 // Get current ports
Pier7b657162018-03-27 11:29:42 -07001729 Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001730 // Rebuild the next objective
Pier7b657162018-03-27 11:29:42 -07001731 currentNext = mcastUtils.nextObjBuilder(
Pier Luigi35dab3f2018-01-25 16:16:02 +01001732 mcastIp,
Pier7b657162018-03-27 11:29:42 -07001733 mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1734 source : null),
Pier Luigi35dab3f2018-01-25 16:16:02 +01001735 currentPorts,
1736 currentNext.id()
1737 ).verify();
1738 // Send to the flowobjective service
1739 srManager.flowObjectiveService.next(deviceId, currentNext);
1740 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001741 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001742 "Missing next for {} and group {}",
1743 deviceId, mcastIp);
1744 }
1745 });
1746
1747 });
1748 } finally {
1749 // Finally, it releases the lock
1750 mcastUnlock();
1751 }
1752
1753 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001754 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001755
1756 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1757 // If mcast ip is present
1758 if (mcastIp != null) {
1759 return mcastNextObjStore.entrySet().stream()
1760 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001761 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001762 entry -> entry.getValue().value().id()));
1763 }
1764 // Otherwise take all the groups
1765 return mcastNextObjStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001766 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001767 entry -> entry.getValue().value().id()));
1768 }
1769
Pier Luigi69f774d2018-02-28 12:10:50 +01001770 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001771 // If mcast ip is present
1772 if (mcastIp != null) {
1773 return mcastRoleStore.entrySet().stream()
1774 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001775 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001776 entry -> entry.getValue().value()));
1777 }
1778 // Otherwise take all the groups
1779 return mcastRoleStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001780 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001781 entry -> entry.getValue().value()));
1782 }
1783
1784 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1785 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1786 // Get the source
Pier7b657162018-03-27 11:29:42 -07001787 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001788 // Source cannot be null, we don't know the starting point
1789 if (source != null) {
1790 // Init steps
1791 Set<DeviceId> visited = Sets.newHashSet();
1792 List<ConnectPoint> currentPath = Lists.newArrayList(
1793 source
1794 );
1795 // Build recursively the mcast paths
1796 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1797 }
1798 return mcastPaths;
1799 }
1800
1801 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1802 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1803 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1804 // If we have visited the node to visit
1805 // there is a loop
1806 if (visited.contains(toVisit)) {
1807 return;
1808 }
1809 // Visit next-hop
1810 visited.add(toVisit);
1811 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1812 // Looking for next-hops
1813 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1814 // Build egress connectpoints
1815 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1816 // Get Ports
Pier7b657162018-03-27 11:29:42 -07001817 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001818 // Build relative cps
1819 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1820 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1821 Set<ConnectPoint> egressPoints = cpBuilder.build();
1822 // Define other variables for the next steps
1823 Set<Link> egressLinks;
1824 List<ConnectPoint> newCurrentPath;
1825 Set<DeviceId> newVisited;
1826 DeviceId newToVisit;
1827 for (ConnectPoint egressPoint : egressPoints) {
1828 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1829 // If it does not have egress links, stop
1830 if (egressLinks.isEmpty()) {
1831 // Add the connect points to the path
1832 newCurrentPath = Lists.newArrayList(currentPath);
1833 newCurrentPath.add(0, egressPoint);
1834 // Save in the map
1835 mcastPaths.put(egressPoint, newCurrentPath);
1836 } else {
1837 newVisited = Sets.newHashSet(visited);
1838 // Iterate over the egress links for the next hops
1839 for (Link egressLink : egressLinks) {
1840 // Update to visit
1841 newToVisit = egressLink.dst().deviceId();
1842 // Add the connect points to the path
1843 newCurrentPath = Lists.newArrayList(currentPath);
1844 newCurrentPath.add(0, egressPoint);
1845 newCurrentPath.add(0, egressLink.dst());
1846 // Go to the next hop
1847 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1848 }
1849 }
1850 }
1851 }
1852 }
1853
Pierdb27b8d2018-04-17 16:29:56 +02001854 /**
1855 * Return the leaders of the mcast groups.
1856 *
1857 * @param mcastIp the group ip
1858 * @return the mapping group-node
1859 */
1860 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1861 return mcastUtils.getMcastLeaders(mcastIp);
1862 }
Charles Chanc91c8782016-03-30 17:54:24 -07001863}